You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2018/07/03 16:18:26 UTC
[3/4] impala git commit: IMPALA-4784: Remove InProcessStatestore
IMPALA-4784: Remove InProcessStatestore
InProcessStatestore was only used by statestore-test, expr-test and
session-expiry-test. With a slight refactor of the Statestore class,
InProcessStatestore becomes obsolete.
This patch moves the ownership of the ThriftServer into the Statestore
class and Statestore::Init() now takes a 'port' parameter instead of
using the FLAGS_state_store_port directly.
We also remove the InProcessStatestore completely. A follow on patch will
get rid of the InProcessImpalaServer too (IMPALA-6013)
Testing: Ran 'core' tests.
Change-Id: I2621873e593b36c9612a6402ac6c5d8e3b49cde9
Reviewed-on: http://gerrit.cloudera.org:8080/10843
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0a470168
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0a470168
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0a470168
Branch: refs/heads/master
Commit: 0a470168138b5f3254d7604a120eb2376a91c20c
Parents: f3b1c4b
Author: Sailesh Mukil <sa...@apache.org>
Authored: Tue Jun 26 10:15:26 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 3 08:21:48 2018 +0000
----------------------------------------------------------------------
be/src/exprs/expr-test.cc | 23 +++++++++++-----
be/src/service/session-expiry-test.cc | 11 +++++---
be/src/statestore/statestore-test.cc | 43 +++++++++++++++++++-----------
be/src/statestore/statestore.cc | 31 ++++++++++++++++++++-
be/src/statestore/statestore.h | 14 ++++++++--
be/src/statestore/statestored-main.cc | 24 +----------------
be/src/testutil/in-process-servers.cc | 36 -------------------------
be/src/testutil/in-process-servers.h | 36 -------------------------
8 files changed, 94 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 999f41a..e03f458 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -28,7 +28,6 @@
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_map.hpp>
-#include "testutil/gtest-util.h"
#include "codegen/llvm-codegen.h"
#include "common/init.h"
#include "common/object-pool.h"
@@ -36,33 +35,36 @@
#include "exprs/like-predicate.h"
#include "exprs/literal.h"
#include "exprs/null-literal.h"
-#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
+#include "exprs/scalar-expr.h"
#include "exprs/string-functions.h"
#include "exprs/timestamp-functions.h"
#include "exprs/timezone_db.h"
#include "gen-cpp/Exprs_types.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/hive_metastore_types.h"
#include "rpc/thrift-client.h"
#include "rpc/thrift-server.h"
-#include "runtime/runtime-state.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.inline.h"
+#include "runtime/runtime-state.h"
#include "runtime/string-value.h"
#include "runtime/timestamp-parse-util.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "service/fe-support.h"
#include "service/impala-server.h"
+#include "statestore/statestore.h"
+#include "testutil/gtest-util.h"
#include "testutil/impalad-query-executor.h"
#include "testutil/in-process-servers.h"
#include "udf/udf-test-harness.h"
+#include "util/asan.h"
#include "util/debug-util.h"
#include "util/string-parser.h"
#include "util/string-util.h"
#include "util/test-info.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
#include "common/names.h"
@@ -83,6 +85,8 @@ using namespace impala;
namespace impala {
ImpaladQueryExecutor* executor_;
+scoped_ptr<MetricGroup> statestore_metrics(new MetricGroup("statestore_metrics"));
+Statestore* statestore;
bool disable_codegen_;
bool enable_expr_rewrites_;
@@ -8798,11 +8802,16 @@ int main(int argc, char** argv) {
FLAGS_abort_on_config_error = false;
VLOG_CONNECTION << "creating test env";
VLOG_CONNECTION << "starting backends";
- InProcessStatestore* ips;
- ABORT_IF_ERROR(InProcessStatestore::StartWithEphemeralPorts(&ips));
+ statestore = new Statestore(statestore_metrics.get());
+ IGNORE_LEAKING_OBJECT(statestore);
+
+ // Pass in 0 to have the statestore use an ephemeral port for the service.
+ ABORT_IF_ERROR(statestore->Init(0));
InProcessImpalaServer* impala_server;
ABORT_IF_ERROR(InProcessImpalaServer::StartWithEphemeralPorts(
- FLAGS_hostname, ips->port(), &impala_server));
+ FLAGS_hostname, statestore->port(), &impala_server));
+ IGNORE_LEAKING_OBJECT(impala_server);
+
executor_ = new ImpaladQueryExecutor(FLAGS_hostname, impala_server->GetBeeswaxPort());
ABORT_IF_ERROR(executor_->Setup());
http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/service/session-expiry-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index b227c1f..89ae842 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -21,8 +21,10 @@
#include "rpc/thrift-client.h"
#include "service/fe-support.h"
#include "service/impala-server.h"
+#include "statestore/statestore.h"
#include "testutil/gtest-util.h"
#include "testutil/in-process-servers.h"
+#include "util/asan.h"
#include "util/impalad-metrics.h"
#include "util/time.h"
@@ -48,11 +50,14 @@ TEST(SessionTest, TestExpiry) {
FLAGS_idle_session_timeout = 1;
// Skip validation checks for in-process backend.
FLAGS_abort_on_config_error = false;
- InProcessStatestore* ips;
- ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&ips));
+ scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
+ Statestore* statestore = new Statestore(metrics.get());
+ IGNORE_LEAKING_OBJECT(statestore);
+ // Pass in 0 to have the statestore use an ephemeral port for the service.
+ ABORT_IF_ERROR(statestore->Init(0));
InProcessImpalaServer* impala;
ASSERT_OK(InProcessImpalaServer::StartWithEphemeralPorts(
- "localhost", ips->port(), &impala));
+ "localhost", statestore->port(), &impala));
IntCounter* expired_metric =
impala->metrics()->FindMetricForTesting<IntCounter>(
ImpaladMetricKeys::NUM_SESSIONS_EXPIRED);
http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore-test.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-test.cc b/be/src/statestore/statestore-test.cc
index b481a63..a9ee095 100644
--- a/be/src/statestore/statestore-test.cc
+++ b/be/src/statestore/statestore-test.cc
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#include "testutil/gtest-util.h"
-#include "testutil/in-process-servers.h"
#include "common/init.h"
-#include "util/metrics.h"
#include "statestore/statestore-subscriber.h"
+#include "testutil/gtest-util.h"
+#include "util/asan.h"
+#include "util/metrics.h"
#include "common/names.h"
@@ -37,23 +37,29 @@ namespace impala {
TEST(StatestoreTest, SmokeTest) {
// All allocations done by 'new' to avoid problems shutting down Thrift servers
// gracefully.
- InProcessStatestore* ips;
- ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&ips));
- ASSERT_TRUE(ips != NULL) << "Could not start Statestore";
+ scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
+ Statestore* statestore = new Statestore(metrics.get());
+ // Thrift will internally pick an ephemeral port if we pass in 0 as the port.
+ int statestore_port = 0;
+ IGNORE_LEAKING_OBJECT(statestore);
+ ASSERT_OK(statestore->Init(statestore_port));
+
+ scoped_ptr<MetricGroup> metrics_2(new MetricGroup("statestore_2"));
// Port already in use
- InProcessStatestore* statestore_wont_start =
- new InProcessStatestore(ips->port(), ips->port() + 10);
- ASSERT_FALSE(statestore_wont_start->Start().ok());
+ Statestore* statestore_wont_start = new Statestore(metrics_2.get());
+ ASSERT_FALSE(statestore_wont_start->Init(statestore->port()).ok());
- StatestoreSubscriber* sub_will_start = new StatestoreSubscriber("sub1",
- MakeNetworkAddress("localhost", 0),
- MakeNetworkAddress("localhost", ips->port()), new MetricGroup(""));
+ StatestoreSubscriber* sub_will_start =
+ new StatestoreSubscriber("sub1", MakeNetworkAddress("localhost", 0),
+ MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
+ IGNORE_LEAKING_OBJECT(sub_will_start);
ASSERT_OK(sub_will_start->Start());
// Confirm that a subscriber trying to use an in-use port will fail to start.
StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("sub3",
MakeNetworkAddress("localhost", sub_will_start->heartbeat_port()),
- MakeNetworkAddress("localhost", ips->port()), new MetricGroup(""));
+ MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
+ IGNORE_LEAKING_OBJECT(sub_will_not_start);
ASSERT_FALSE(sub_will_not_start->Start().ok());
}
@@ -67,13 +73,17 @@ TEST(StatestoreSslTest, SmokeTest) {
server_key << impala_home << "/be/src/testutil/server-key.pem";
FLAGS_ssl_private_key = server_key.str();
- InProcessStatestore* statestore;
- ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&statestore));
- if (statestore == NULL) FAIL() << "Unable to start Statestore";
+ // Thrift will internally pick an ephemeral port if we pass in 0 as the port.
+ int statestore_port = 0;
+ scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
+ Statestore* statestore = new Statestore(metrics.get());
+ IGNORE_LEAKING_OBJECT(statestore);
+ ASSERT_OK(statestore->Init(statestore_port));
StatestoreSubscriber* sub_will_start = new StatestoreSubscriber("smoke_sub1",
MakeNetworkAddress("localhost", 0),
MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
+ IGNORE_LEAKING_OBJECT(sub_will_start);
ASSERT_OK(sub_will_start->Start());
stringstream invalid_server_cert;
@@ -83,6 +93,7 @@ TEST(StatestoreSslTest, SmokeTest) {
StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("smoke_sub2",
MakeNetworkAddress("localhost", 0),
MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
+ IGNORE_LEAKING_OBJECT(sub_will_not_start);
ASSERT_FALSE(sub_will_not_start->Start().ok());
}
http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index a58aec1..a208d97 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -28,6 +28,7 @@
#include "common/status.h"
#include "gen-cpp/StatestoreService_types.h"
+#include "rpc/rpc-trace.h"
#include "rpc/thrift-util.h"
#include "statestore/failure-detector.h"
#include "statestore/statestore-subscriber-client-wrapper.h"
@@ -98,6 +99,12 @@ DEFINE_int32(statestore_update_tcp_timeout_seconds, 300, "(Advanced) The time af
"badly hung machines that are not able to respond to the update RPC in short "
"order.");
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+DECLARE_string(ssl_minimum_version);
+
// Metric keys
// TODO: Replace 'backend' with 'subscriber' when we can coordinate a change with CM
const string STATESTORE_LIVE_SUBSCRIBERS = "statestore.live-backends";
@@ -408,6 +415,7 @@ Statestore::Statestore(MetricGroup* metrics)
FLAGS_statestore_max_missed_heartbeats / 2)) {
DCHECK(metrics != NULL);
+ metrics_ = metrics;
num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
@@ -426,10 +434,31 @@ Statestore::Statestore(MetricGroup* metrics)
heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
}
-Status Statestore::Init() {
+Status Statestore::Init(int32_t state_store_port) {
+ boost::shared_ptr<TProcessor> processor(new StatestoreServiceProcessor(thrift_iface()));
+ boost::shared_ptr<TProcessorEventHandler> event_handler(
+ new RpcEventHandler("statestore", metrics_));
+ processor->setEventHandler(event_handler);
+ ThriftServerBuilder builder("StatestoreService", processor, state_store_port);
+ if (IsInternalTlsConfigured()) {
+ SSLProtocol ssl_version;
+ RETURN_IF_ERROR(
+ SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
+ LOG(INFO) << "Enabling SSL for Statestore";
+ builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
+ .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
+ .ssl_version(ssl_version)
+ .cipher_list(FLAGS_ssl_cipher_list);
+ }
+ ThriftServer* server;
+ RETURN_IF_ERROR(builder.metrics(metrics_).Build(&server));
+ thrift_server_.reset(server);
+ RETURN_IF_ERROR(thrift_server_->Start());
+
RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
+
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 71e1ade..1d7f1a2 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -123,9 +123,10 @@ class Statestore : public CacheLineAligned {
/// The only constructor; initialises member variables only.
Statestore(MetricGroup* metrics);
+ /// Initialize and start the backing ThriftServer with port 'state_store_port'.
/// Initialize the ThreadPools used for updates and heartbeats. Returns an error if
- /// ThreadPool initialization fails.
- Status Init() WARN_UNUSED_RESULT;
+ /// any of the above initialization fails.
+ Status Init(int32_t state_store_port) WARN_UNUSED_RESULT;
/// Registers a new subscriber with the given unique subscriber ID, running a subscriber
/// service at the given location, with the provided list of topic subscriptions.
@@ -158,6 +159,9 @@ class Statestore : public CacheLineAligned {
static const std::string IMPALA_MEMBERSHIP_TOPIC;
/// Topic tracking the state of admission control on all coordinators.
static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
+
+ int32_t port() { return thrift_server_->port(); }
+
private:
/// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
/// pair.
@@ -526,6 +530,12 @@ class Statestore : public CacheLineAligned {
/// of time.
boost::scoped_ptr<StatestoreSubscriberClientCache> heartbeat_client_cache_;
+ /// Container for the internal statestore service.
+ boost::scoped_ptr<ThriftServer> thrift_server_;
+
+ /// Pointer to the MetricGroup for this statestore. Not owned.
+ MetricGroup* metrics_;
+
/// Thrift API implementation which proxies requests onto this Statestore
boost::shared_ptr<StatestoreServiceIf> thrift_iface_;
http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 633d449..d7db794 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -31,7 +31,6 @@
#include "util/common-metrics.h"
#include "util/debug-util.h"
#include "util/metrics.h"
-#include "util/openssl-util.h"
#include "util/memory-metrics.h"
#include "util/webserver.h"
#include "util/default-path-handlers.h"
@@ -39,12 +38,6 @@
DECLARE_int32(state_store_port);
DECLARE_int32(webserver_port);
DECLARE_bool(enable_webserver);
-DECLARE_string(principal);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_private_key);
-DECLARE_string(ssl_private_key_password_cmd);
-DECLARE_string(ssl_cipher_list);
-DECLARE_string(ssl_minimum_version);
#include "common/names.h"
@@ -82,7 +75,7 @@ int StatestoredMain(int argc, char** argv) {
CommonMetrics::InitCommonMetrics(metrics.get());
Statestore statestore(metrics.get());
- ABORT_IF_ERROR(statestore.Init());
+ ABORT_IF_ERROR(statestore.Init(FLAGS_state_store_port));
statestore.RegisterWebpages(webserver.get());
boost::shared_ptr<TProcessor> processor(
new StatestoreServiceProcessor(statestore.thrift_iface()));
@@ -90,21 +83,6 @@ int StatestoredMain(int argc, char** argv) {
new RpcEventHandler("statestore", metrics.get()));
processor->setEventHandler(event_handler);
- ThriftServer* server;
- ThriftServerBuilder builder("StatestoreService", processor, FLAGS_state_store_port);
- if (IsInternalTlsConfigured()) {
- SSLProtocol ssl_version;
- ABORT_IF_ERROR(
- SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
- LOG(INFO) << "Enabling SSL for Statestore";
- builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
- .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
- .ssl_version(ssl_version)
- .cipher_list(FLAGS_ssl_cipher_list);
- }
- ABORT_IF_ERROR(builder.metrics(metrics.get()).Build(&server));
- ABORT_IF_ERROR(server->Start());
-
statestore.MainLoop();
return 0;
http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 031a07e..8a786e0 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -105,39 +105,3 @@ int InProcessImpalaServer::GetBeeswaxPort() const {
int InProcessImpalaServer::GetHS2Port() const {
return impala_server_->GetHS2Port();
}
-
-Status InProcessStatestore::StartWithEphemeralPorts(InProcessStatestore** statestore) {
- *statestore = new InProcessStatestore(0, 0);
- return (*statestore)->Start();
-}
-
-InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port)
- : webserver_(new Webserver(webserver_port)),
- metrics_(new MetricGroup("statestore")),
- statestore_port_(statestore_port),
- statestore_(new Statestore(metrics_.get())) {
- AddDefaultUrlCallbacks(webserver_.get());
- statestore_->RegisterWebpages(webserver_.get());
-}
-
-Status InProcessStatestore::Start() {
- RETURN_IF_ERROR(statestore_->Init());
- RETURN_IF_ERROR(webserver_->Start());
- boost::shared_ptr<TProcessor> processor(
- new StatestoreServiceProcessor(statestore_->thrift_iface()));
-
- ThriftServerBuilder builder("StatestoreService", processor, statestore_port_);
- if (IsInternalTlsConfigured()) {
- LOG(INFO) << "Enabling SSL for Statestore";
- builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key);
- }
- ThriftServer* server;
- ABORT_IF_ERROR(builder.metrics(metrics_.get()).Build(&server));
- statestore_server_.reset(server);
- RETURN_IF_ERROR(Thread::Create("statestore", "main-loop",
- &Statestore::MainLoop, statestore_.get(), &statestore_main_loop_));
-
- RETURN_IF_ERROR(statestore_server_->Start());
- statestore_port_ = statestore_server_->port();
- return WaitForServer("localhost", statestore_port_, 10, 100);
-}
http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 6ac9734..f863650 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -93,42 +93,6 @@ class InProcessImpalaServer {
boost::scoped_ptr<ExecEnv> exec_env_;
};
-/// An in-process statestore, with webserver and metrics.
-class InProcessStatestore {
- public:
-
- // Creates and starts an InProcessStatestore with ports chosen from the ephemeral port
- // range. Returns OK and sets *statestore on success. On failure, an error is
- /// returned and *statestore may or may not be set but is always invalid to use.
- static Status StartWithEphemeralPorts(InProcessStatestore** statestore);
-
- /// Constructs but does not start the statestore.
- InProcessStatestore(int statestore_port, int webserver_port);
-
- /// Starts the statestore server, and the processing thread.
- Status Start();
-
- uint32_t port() { return statestore_port_; }
-
- private:
- /// Websever object to serve debug pages through.
- boost::scoped_ptr<Webserver> webserver_;
-
- /// MetricGroup object
- boost::scoped_ptr<MetricGroup> metrics_;
-
- /// Port to start the statestore on.
- uint32_t statestore_port_;
-
- /// The statestore instance
- boost::scoped_ptr<Statestore> statestore_;
-
- /// Statestore Thrift server
- boost::scoped_ptr<ThriftServer> statestore_server_;
-
- std::unique_ptr<Thread> statestore_main_loop_;
-};
-
}
#endif