You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/05 21:57:55 UTC

[3/3] impala git commit: IMPALA-7775: fix some lifecycle issues in statestore/session tests

IMPALA-7775: fix some lifecycle issues in statestore/session tests

Background threads from the Statestore's thread pool continued
running in the background and could dereference invalid memory.
We make sure these threads are cleaned up before moving onto
the next test. Note that we don't clean up all background
threads, just the ones that had caused issues here.

I refactored the memory management a bit to put all objects
that we can't safely free into a single ObjectPool.

The statestore tests also had an issue with the lifetime of the
string flags FLAGS_ssl_*_certificate. Those were overwritten
with new values while the thread pool threads were still running,
which could cause use-after-free bugs.

Testing:
Looped the tests under ASAN with the "stress" utility running at the
same time to flush out races.

Ran core tests.

Change-Id: I3b25c8b8a96bfa1183ce273b3bb4debde234dd01
Reviewed-on: http://gerrit.cloudera.org:8080/11864
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/572722e3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/572722e3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/572722e3

Branch: refs/heads/master
Commit: 572722e3505a663560e3fff1fe04d8fe932c0959
Parents: f08642b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Nov 1 12:00:37 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Nov 5 21:41:00 2018 +0000

----------------------------------------------------------------------
 be/src/service/session-expiry-test.cc | 13 ++++-
 be/src/statestore/statestore-test.cc  | 89 ++++++++++++++++++------------
 be/src/statestore/statestore.cc       | 11 ++++
 be/src/statestore/statestore.h        |  7 ++-
 4 files changed, 81 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/572722e3/be/src/service/session-expiry-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index 89ae842..7e5b86a 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -44,14 +44,20 @@ DECLARE_int32(beeswax_port);
 // TODO: Come up with a short-running test that confirms a session will keep itself alive
 // that doesn't depend upon being rescheduled in a timely fashion.
 
+// Object pool containing all objects that must live for the duration of the process.
+// E.g. objects that are singletons and never destroyed in a real daemon (so don't support
+// tear-down logic), but which we create multiple times in unit tests. We leak this pool
+// instead of destroying it to avoid destroying the contained objects.
+static ObjectPool* perm_objects;
+
 TEST(SessionTest, TestExpiry) {
   const int NUM_SESSIONS = 5;
   const int MAX_IDLE_TIMEOUT_MS = 4000;
   FLAGS_idle_session_timeout = 1;
   // Skip validation checks for in-process backend.
   FLAGS_abort_on_config_error = false;
-  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
-  Statestore* statestore = new Statestore(metrics.get());
+  MetricGroup* metrics = perm_objects->Add(new MetricGroup("statestore"));
+  Statestore* statestore = perm_objects->Add(new Statestore(metrics));
   IGNORE_LEAKING_OBJECT(statestore);
   // Pass in 0 to have the statestore use an ephemeral port for the service.
   ABORT_IF_ERROR(statestore->Init(0));
@@ -111,11 +117,14 @@ TEST(SessionTest, TestExpiry) {
   // work). Sleep to allow the threads closing the session to complete before tearing down
   // the server.
   SleepForMs(1000);
+  statestore->ShutdownForTesting();
 }
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   InitFeSupport();
+  perm_objects = new ObjectPool;
+  IGNORE_LEAKING_OBJECT(perm_objects);
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/572722e3/be/src/statestore/statestore-test.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-test.cc b/be/src/statestore/statestore-test.cc
index a9ee095..f517a56 100644
--- a/be/src/statestore/statestore-test.cc
+++ b/be/src/statestore/statestore-test.cc
@@ -34,77 +34,94 @@ DECLARE_int32(state_store_port);
 
 namespace impala {
 
+// Object pool containing all objects that must live for the duration of the process.
+// E.g. objects that are singletons and never destroyed in a real daemon (so don't support
+// tear-down logic), but which we create multiple times in unit tests. We leak this pool
+// instead of destroying it to avoid destroying the contained objects.
+static ObjectPool* perm_objects;
+
 TEST(StatestoreTest, SmokeTest) {
   // All allocations done by 'new' to avoid problems shutting down Thrift servers
   // gracefully.
-  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
-  Statestore* statestore = new Statestore(metrics.get());
+  MetricGroup* metrics = perm_objects->Add(new MetricGroup("statestore"));
+  Statestore* statestore = perm_objects->Add(new Statestore(metrics));
   // Thrift will internally pick an ephemeral port if we pass in 0 as the port.
   int statestore_port = 0;
-  IGNORE_LEAKING_OBJECT(statestore);
   ASSERT_OK(statestore->Init(statestore_port));
 
-  scoped_ptr<MetricGroup> metrics_2(new MetricGroup("statestore_2"));
+  MetricGroup* metrics_2 = perm_objects->Add(new MetricGroup("statestore_2"));
   // Port already in use
-  Statestore* statestore_wont_start = new Statestore(metrics_2.get());
+  Statestore* statestore_wont_start = perm_objects->Add(new Statestore(metrics_2));
   ASSERT_FALSE(statestore_wont_start->Init(statestore->port()).ok());
 
-  StatestoreSubscriber* sub_will_start =
+  StatestoreSubscriber* sub_will_start = perm_objects->Add(
       new StatestoreSubscriber("sub1", MakeNetworkAddress("localhost", 0),
-          MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
-  IGNORE_LEAKING_OBJECT(sub_will_start);
+          MakeNetworkAddress("localhost", statestore->port()), new MetricGroup("")));
   ASSERT_OK(sub_will_start->Start());
 
   // Confirm that a subscriber trying to use an in-use port will fail to start.
-  StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("sub3",
-      MakeNetworkAddress("localhost", sub_will_start->heartbeat_port()),
-      MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
-  IGNORE_LEAKING_OBJECT(sub_will_not_start);
+  StatestoreSubscriber* sub_will_not_start = perm_objects->Add(new StatestoreSubscriber(
+      "sub3", MakeNetworkAddress("localhost", sub_will_start->heartbeat_port()),
+      MakeNetworkAddress("localhost", statestore->port()), new MetricGroup("")));
   ASSERT_FALSE(sub_will_not_start->Start().ok());
+
+  statestore->ShutdownForTesting();
 }
 
-TEST(StatestoreSslTest, SmokeTest) {
+// Runs an SSL smoke test with provided parameters.
+void SslSmokeTestHelper(const string& server_ca_certificate,
+    const string& client_ca_certificate, bool sub_should_start) {
   string impala_home(getenv("IMPALA_HOME"));
   stringstream server_cert;
   server_cert << impala_home << "/be/src/testutil/server-cert.pem";
-  FLAGS_ssl_server_certificate = server_cert.str();
-  FLAGS_ssl_client_ca_certificate = server_cert.str();
+  // Override flags for the duration of this test. Modifying them while the statestore
+  // is running is unsafe.
+  FLAGS_ssl_server_certificate = server_ca_certificate;
+  FLAGS_ssl_client_ca_certificate = client_ca_certificate;
   stringstream server_key;
   server_key << impala_home << "/be/src/testutil/server-key.pem";
   FLAGS_ssl_private_key = server_key.str();
 
   // Thrift will internally pick an ephemeral port if we pass in 0 as the port.
   int statestore_port = 0;
-  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
-  Statestore* statestore = new Statestore(metrics.get());
-  IGNORE_LEAKING_OBJECT(statestore);
+  MetricGroup* metrics = perm_objects->Add(new MetricGroup("statestore"));
+  Statestore* statestore = perm_objects->Add(new Statestore(metrics));
   ASSERT_OK(statestore->Init(statestore_port));
 
-  StatestoreSubscriber* sub_will_start = new StatestoreSubscriber("smoke_sub1",
-      MakeNetworkAddress("localhost", 0),
-      MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
-  IGNORE_LEAKING_OBJECT(sub_will_start);
-  ASSERT_OK(sub_will_start->Start());
+  StatestoreSubscriber* sub = perm_objects->Add(
+      new StatestoreSubscriber("smoke_sub", MakeNetworkAddress("localhost", 0),
+          MakeNetworkAddress("localhost", statestore->port()), new MetricGroup("")));
+  Status sub_status = sub->Start();
+  ASSERT_EQ(sub_should_start, sub_status.ok());
 
-  stringstream invalid_server_cert;
-  invalid_server_cert << impala_home << "/be/src/testutil/invalid-server-cert.pem";
-  FLAGS_ssl_client_ca_certificate = invalid_server_cert.str();
+  statestore->ShutdownForTesting();
+}
 
-  StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("smoke_sub2",
-      MakeNetworkAddress("localhost", 0),
-      MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
-  IGNORE_LEAKING_OBJECT(sub_will_not_start);
-  ASSERT_FALSE(sub_will_not_start->Start().ok());
+string GetValidServerCert() {
+  string impala_home(getenv("IMPALA_HOME"));
+  stringstream server_cert;
+  server_cert << impala_home << "/be/src/testutil/server-cert.pem";
+  return server_cert.str();
+}
+
+TEST(StatestoreSslTest, ValidCertSmokeTest) {
+  string valid_cert = GetValidServerCert();
+  SslSmokeTestHelper(valid_cert, valid_cert, true);
 }
 
+TEST(StatestoreSslTest, InvalidCertSmokeTest) {
+  string impala_home(getenv("IMPALA_HOME"));
+  stringstream invalid_server_cert;
+  invalid_server_cert << impala_home << "/be/src/testutil/invalid-server-cert.pem";
+  SslSmokeTestHelper(GetValidServerCert(), invalid_server_cert.str(), false);
 }
 
+} // namespace impala
+
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
-  int rc = RUN_ALL_TESTS();
-  // IMPALA-5291: statestore services and subscribers may still be running at this point
-  // and accessing global state. Exit without running global destructors to avoid
-  // races with other threads when tearing down the proces.
-  _exit(rc);
+  perm_objects = new ObjectPool;
+  IGNORE_LEAKING_OBJECT(perm_objects);
+  return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/572722e3/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 4e63dad..70183d7 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -36,6 +36,7 @@
 #include "util/debug-util.h"
 #include "util/logging-support.h"
 #include "util/openssl-util.h"
+#include "util/test-info.h"
 #include "util/time.h"
 #include "util/uid-util.h"
 #include "util/webserver.h"
@@ -1050,3 +1051,13 @@ void Statestore::MainLoop() {
   subscriber_priority_topic_update_threadpool_.Join();
   subscriber_heartbeat_threadpool_.Join();
 }
+
+void Statestore::ShutdownForTesting() {
+  CHECK(TestInfo::is_be_test()) << "Only valid to call in backend tests.";
+  subscriber_topic_update_threadpool_.Shutdown();
+  subscriber_priority_topic_update_threadpool_.Shutdown();
+  subscriber_heartbeat_threadpool_.Shutdown();
+  subscriber_topic_update_threadpool_.Join();
+  subscriber_priority_topic_update_threadpool_.Join();
+  subscriber_heartbeat_threadpool_.Join();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/572722e3/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 52f7d68..871494c 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -157,6 +157,11 @@ class Statestore : public CacheLineAligned {
   /// The main processing loop. Runs infinitely.
   void MainLoop();
 
+  /// Shut down some background threads. Only used for testing. Note that this is not
+  /// a clean shutdown because we can't correctly tear down 'thrift_server_', so
+  /// not all background threads are stopped and this object cannot be destroyed.
+  void ShutdownForTesting();
+
   /// Returns the Thrift API interface that proxies requests onto the local Statestore.
   const boost::shared_ptr<StatestoreServiceIf>& thrift_iface() const {
     return thrift_iface_;
@@ -713,6 +718,6 @@ class Statestore : public CacheLineAligned {
   [[noreturn]] void MonitorSubscriberHeartbeat();
 };
 
-}
+} // namespace impala
 
 #endif