You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/08/01 22:11:43 UTC

[6/9] kudu git commit: KUDU-1358 (part 3): new multi-master stress test

KUDU-1358 (part 3): new multi-master stress test

This commit adds a stress test for multiple masters. The idea is simple:
issue DDL operations at a high rate while periodically restarting a master.

There's a balance to be struck both in the throughput of the operations and
in the periodicity of the restarts; we need to ensure that the masters can
make enough forward progress (in spite of the failures) to process all of
the requests without timing out. To assist, the client uses abnormally long
timeouts on all operations.

Change-Id: I40b5b78c100a7b427b2f4aac3a54665e82a9618c
Reviewed-on: http://gerrit.cloudera.org:8080/3611
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fb2022c4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fb2022c4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fb2022c4

Branch: refs/heads/master
Commit: fb2022c4297945dcf6141e65f579b2172345adf9
Parents: 232474a
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Jul 8 18:52:51 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Aug 1 21:13:32 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/CMakeLists.txt       |  27 +-
 .../integration-tests/master-stress-test.cc     | 405 +++++++++++++++++++
 2 files changed, 419 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fb2022c4/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index b2fefae..0c2d449 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -40,34 +40,35 @@ add_dependencies(integration-tests
 
 # Tests
 set(KUDU_TEST_LINK_LIBS integration-tests ${KUDU_MIN_TEST_LIBS})
-ADD_KUDU_TEST(alter_table-test)
+ADD_KUDU_TEST(all_types-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(alter_table-randomized-test)
-ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
+ADD_KUDU_TEST(alter_table-test)
 ADD_KUDU_TEST(client_failover-itest)
 ADD_KUDU_TEST(client-stress-test
   RESOURCE_LOCK "master-rpc-ports"
   RUN_SERIAL true)
-ADD_KUDU_TEST(disk_reservation-itest)
-ADD_KUDU_TEST(master_replication-itest RESOURCE_LOCK "master-rpc-ports")
-ADD_KUDU_TEST(master_failover-itest RESOURCE_LOCK "master-rpc-ports")
-ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
-ADD_KUDU_TEST(table_locations-itest)
-ADD_KUDU_TEST(ts_tablet_manager-itest)
-ADD_KUDU_TEST(ts_recovery-itest)
+ADD_KUDU_TEST(create-table-itest)
 ADD_KUDU_TEST(create-table-stress-test)
 ADD_KUDU_TEST(delete_table-test)
+ADD_KUDU_TEST(disk_reservation-itest)
 ADD_KUDU_TEST(external_mini_cluster-test RESOURCE_LOCK "master-rpc-ports")
+ADD_KUDU_TEST(fuzz-itest)
 ADD_KUDU_TEST(linked_list-test RESOURCE_LOCK "master-rpc-ports")
-ADD_KUDU_TEST(all_types-itest RESOURCE_LOCK "master-rpc-ports")
+ADD_KUDU_TEST(master_failover-itest RESOURCE_LOCK "master-rpc-ports")
+ADD_KUDU_TEST(master_replication-itest RESOURCE_LOCK "master-rpc-ports")
+ADD_KUDU_TEST(master-stress-test RESOURCE_LOCK "master-rpc-ports")
+ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
+ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(remote_bootstrap-itest)
+ADD_KUDU_TEST(table_locations-itest)
 ADD_KUDU_TEST(tablet_replacement-itest)
-ADD_KUDU_TEST(create-table-itest)
-ADD_KUDU_TEST(fuzz-itest)
+ADD_KUDU_TEST(ts_recovery-itest)
+ADD_KUDU_TEST(ts_tablet_manager-itest)
 ADD_KUDU_TEST(write_throttling-itest)
 ADD_KUDU_TEST(exactly_once_writes-itest)
 
 # Some tests have additional dependencies
 set(KUDU_TEST_LINK_LIBS kudu_client kudu_tools_util ${KUDU_TEST_LINK_LIBS})
+ADD_KUDU_TEST(flex_partitioning-itest)
 ADD_KUDU_TEST(full_stack-insert-scan-test RUN_SERIAL true)
 ADD_KUDU_TEST(update_scan_delta_compact-test RUN_SERIAL true)
-ADD_KUDU_TEST(flex_partitioning-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb2022c4/src/kudu/integration-tests/master-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master-stress-test.cc b/src/kudu/integration-tests/master-stress-test.cc
new file mode 100644
index 0000000..b77d2eb
--- /dev/null
+++ b/src/kudu/integration-tests/master-stress-test.cc
@@ -0,0 +1,405 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "kudu/client/client.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(num_create_table_threads, 4,
+            "Number of threads that should create tables");
+DEFINE_int32(num_alter_table_threads, 2,
+            "Number of threads that should alter tables");
+DEFINE_int32(num_delete_table_threads, 2,
+            "Number of threads that should delete tables");
+DEFINE_int32(num_seconds_to_run, 5,
+             "Number of seconds that the test should run");
+
+namespace kudu {
+
+using client::KuduClient;
+using client::KuduClientBuilder;
+using client::KuduColumnSchema;
+using client::KuduSchema;
+using client::KuduSchemaBuilder;
+using client::KuduTable;
+using client::KuduTableAlterer;
+using client::KuduTableCreator;
+using master::MasterServiceProxy;
+using master::ListTablesRequestPB;
+using master::ListTablesResponsePB;
+using rpc::Messenger;
+using rpc::MessengerBuilder;
+using rpc::RpcController;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+static const MonoDelta kDefaultAdminTimeout = MonoDelta::FromSeconds(300);
+
+class MasterStressTest : public KuduTest {
+ public:
+  MasterStressTest()
+    : done_(1),
+      num_tables_created_(0),
+      num_tables_altered_(0),
+      num_tables_deleted_(0),
+      num_masters_restarted_(0),
+      table_names_condvar_(&table_names_lock_),
+      rand_(SeedRandom()) {
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    ExternalMiniClusterOptions opts;
+    opts.master_rpc_ports = { 11010, 11011, 11012 };
+    opts.num_masters = opts.master_rpc_ports.size();
+    opts.num_tablet_servers = 3;
+
+    // Because this test performs a lot of DDL operations, we end up flushing
+    // and rewriting metadata files quite a bit. Globally disabling fsync
+    // speeds the test runtime up dramatically.
+    opts.extra_master_flags.push_back("--never_fsync");
+    opts.extra_tserver_flags.push_back("--never_fsync");
+
+    // Don't preallocate log segments, since we're creating many tablets here.
+    // If each preallocates 64M or so, we use a ton of disk space in this
+    // test, and it fails on normal sized /tmp dirs.
+    opts.extra_master_flags.push_back("--log_preallocate_segments=false");
+    opts.extra_tserver_flags.push_back("--log_preallocate_segments=false");
+
+    // Reduce various timeouts below as to make the detection of leader master
+    // failures (specifically, failures as result of long pauses) more rapid.
+
+    // Set max missed heartbeats periods to 1.0 (down from 3.0).
+    opts.extra_master_flags.push_back("--leader_failure_max_missed_heartbeat_periods=1.0");
+
+    // Set the TS->master heartbeat timeout to 1 second (down from 15 seconds).
+    opts.extra_tserver_flags.push_back("--heartbeat_rpc_timeout_ms=1000");
+
+    // Allow one TS heartbeat failure before retrying with back-off (down from 3).
+    opts.extra_tserver_flags.push_back("--heartbeat_max_failures_before_backoff=1");
+
+    // Wait for 500 ms after 'max_consecutive_failed_heartbeats' before trying
+    // again (down from 1 second).
+    opts.extra_tserver_flags.push_back("--heartbeat_interval_ms=500");
+
+    cluster_.reset(new ExternalMiniCluster(opts));
+    ASSERT_OK(cluster_->Start());
+    KuduClientBuilder builder;
+
+    // Create and alter table operation timeouts can be extended via their
+    // builders, but there's no such option for DeleteTable, so we extend
+    // the global operation timeout.
+    builder.default_admin_operation_timeout(kDefaultAdminTimeout);
+
+    // Encourage the client to switch masters quickly in the event of failover.
+    builder.default_rpc_timeout(MonoDelta::FromSeconds(1));
+
+    ASSERT_OK(cluster_->CreateClient(builder, &client_));
+  }
+
+  void TearDown() override {
+    Shutdown();
+    KuduTest::TearDown();
+  }
+
+  void Shutdown() {
+    if (cluster_) {
+      cluster_->Shutdown();
+      cluster_.reset();
+    }
+  }
+
+  void CreateTableThread() {
+    while (done_.count()) {
+      // Create a basic table with a random name.
+      KuduSchema schema;
+      KuduSchemaBuilder b;
+      b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+      CHECK_OK(b.Build(&schema));
+
+      string to_create = GenerateTableName();
+      unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+      Status s = table_creator->table_name(to_create)
+          .schema(&schema)
+          .set_range_partition_columns({ "key" })
+          .wait(false)
+          .timeout(kDefaultAdminTimeout)
+          .Create();
+      if (s.IsAlreadyPresent()) {
+        // The client retried after the RPC timed out, but the master did in
+        // fact create the table.
+        //
+        // TODO: Should be fixed with Exactly Once semantics, see KUDU-1537.
+        continue;
+      }
+      if (s.IsServiceUnavailable()) {
+        // The client retried after the RPC timed out, and the retried RPC
+        // arrived at the master while the table was still being created.
+        //
+        // TODO: Should be fixed with Exactly Once semantics, see KUDU-1537.
+        continue;
+      }
+      if (s.IsInvalidArgument() &&
+          MatchPattern(s.ToString(), "*Not enough live tablet servers*")) {
+        // The test placed enough load on the cluster that some tservers
+        // haven't heartbeat in a little while.
+        continue;
+      }
+      CHECK_OK(s);
+      num_tables_created_.Increment();
+      PutTableName(to_create);
+
+      done_.WaitFor(MonoDelta::FromMilliseconds(200));
+    }
+
+  }
+
+  void AlterTableThread() {
+    while (done_.count()) {
+      // Rename a table at random.
+      string to_alter;
+      if (!BlockingGetTableName(&to_alter)) {
+        break;
+      }
+      string new_table_name = GenerateTableName();
+      unique_ptr<KuduTableAlterer> table_alterer(
+          client_->NewTableAlterer(to_alter));
+      Status s = table_alterer
+        ->RenameTo(new_table_name)
+        ->wait(false)
+        ->timeout(kDefaultAdminTimeout)
+        ->Alter();
+      if (s.IsNotFound()) {
+        // The client retried after the RPC timed out, but the master did in
+        // fact rename the table, or is actively renaming it.
+        //
+        // TODO: Should be fixed with Exactly Once semantics, see KUDU-1537.
+        continue;
+      }
+      CHECK_OK(s);
+      num_tables_altered_.Increment();
+      PutTableName(new_table_name);
+
+      done_.WaitFor(MonoDelta::FromMilliseconds(200));
+    }
+  }
+
+  void DeleteTableThread() {
+    while (done_.count()) {
+      // Delete a table at random.
+      string to_delete;
+      if (!BlockingGetTableName(&to_delete)) {
+        break;
+      }
+      Status s = client_->DeleteTable(to_delete);
+      if (s.IsNotFound()) {
+        // The client retried after the RPC timed out, but the master did in
+        // fact delete the table.
+        //
+        // TODO: Should be fixed with Exactly Once semantics, see KUDU-1537.
+        continue;
+      }
+      CHECK_OK(s);
+      num_tables_deleted_.Increment();
+
+      done_.WaitFor(MonoDelta::FromMilliseconds(200));
+    }
+
+  }
+
+  Status WaitForMasterUpAndRunning(const shared_ptr<Messenger>& messenger,
+                                   ExternalMaster* master) {
+    unique_ptr<MasterServiceProxy> proxy(
+        new MasterServiceProxy(messenger, master->bound_rpc_addr()));
+    while (true) {
+      ListTablesRequestPB req;
+      ListTablesResponsePB resp;
+      RpcController rpc;
+      Status s = proxy->ListTables(req, &resp, &rpc);
+      if (s.ok()) {
+        if (!resp.has_error()) {
+          // This master is the leader and is up and running.
+          break;
+        } else {
+          s = StatusFromPB(resp.error().status());
+          if (s.IsIllegalState()) {
+            // This master is not the leader but is otherwise up and running.
+            break;
+          } else if (!s.IsServiceUnavailable()) {
+            // Unexpected error from master.
+            return s;
+          }
+        }
+      } else if (!s.IsNetworkError()) {
+        // Unexpected error from proxy.
+        return s;
+      }
+
+      // There was some kind of transient network error or the master isn't yet
+      // ready. Sleep and retry.
+      SleepFor(MonoDelta::FromMilliseconds(50));
+    }
+    return Status::OK();
+  }
+
+  void RestartMasterLoop() {
+    shared_ptr<Messenger> messenger;
+    MessengerBuilder bld("RestartMasterMessenger");
+    CHECK_OK(bld.Build(&messenger));
+
+    MonoTime deadline(MonoTime::Now(MonoTime::FINE));
+    deadline.AddDelta(MonoDelta::FromSeconds(FLAGS_num_seconds_to_run));
+
+    MonoTime now(MonoTime::Now(MonoTime::FINE));
+    while (now.ComesBefore(deadline)) {
+      ExternalMaster* master = cluster_->master(
+          rand_.Uniform(cluster_->num_masters()));
+      master->Shutdown();
+
+      // Give the rest of the test a chance to operate with the master down.
+      SleepFor(MonoDelta::FromMilliseconds(rand_.Uniform(500)));
+
+      CHECK_OK(master->Restart());
+
+      // Wait for the master to start servicing requests before restarting the
+      // next one.
+      //
+      // This isn't necessary for correctness, but it helps give the masters
+      // enough uptime so that they can actually make forward progress on
+      // client requests.
+      CHECK_OK(WaitForMasterUpAndRunning(messenger, master));
+      num_masters_restarted_.Increment();
+
+      SleepFor(MonoDelta::FromMilliseconds(rand_.Uniform(200)));
+      now = MonoTime::Now(MonoTime::FINE);
+    }
+  }
+
+ protected:
+  CountDownLatch done_;
+  AtomicInt<uint64_t> num_tables_created_;
+  AtomicInt<uint64_t> num_tables_altered_;
+  AtomicInt<uint64_t> num_tables_deleted_;
+  AtomicInt<uint64_t> num_masters_restarted_;
+
+  Mutex table_names_lock_;
+  ConditionVariable table_names_condvar_;
+  vector<string> table_names_;
+
+ private:
+  string GenerateTableName() {
+    return Substitute("table-$0", oid_generator_.Next());
+  }
+
+  bool BlockingGetTableName(string* chosen_table) {
+    std::lock_guard<Mutex> l(table_names_lock_);
+    while (table_names_.empty() && done_.count()) {
+      table_names_condvar_.Wait();
+    }
+    if (done_.count() == 0) {
+      return false;
+    }
+
+    // Choose a table name at random. Remove it from the vector by replacing
+    // it with the last name; this is more efficient than erasing in place.
+    int num_tables = table_names_.size();
+    int idx = rand_.Uniform(num_tables);
+    *chosen_table = table_names_[idx];
+    if (num_tables > 1) {
+      table_names_[idx] = table_names_[num_tables - 1];
+    }
+    table_names_.pop_back();
+    return true;
+  }
+
+  void PutTableName(const string& table_name) {
+    std::lock_guard<Mutex> l(table_names_lock_);
+    table_names_.push_back(table_name);
+    table_names_condvar_.Signal();
+  }
+
+  ThreadSafeRandom rand_;
+  ObjectIdGenerator oid_generator_;
+  unique_ptr<ExternalMiniCluster> cluster_;
+  client::sp::shared_ptr<KuduClient> client_;
+};
+
+TEST_F(MasterStressTest, Test) {
+  OverrideFlagForSlowTests("num_create_table_threads", "10");
+  OverrideFlagForSlowTests("num_alter_table_threads", "5");
+  OverrideFlagForSlowTests("num_delete_table_threads", "5");
+  OverrideFlagForSlowTests("num_seconds_to_run", "30");
+
+  // Start all of the threads.
+  vector<thread> threads;
+  for (int i = 0; i < FLAGS_num_create_table_threads; i++) {
+    threads.emplace_back(&MasterStressTest::CreateTableThread, this);
+  }
+  for (int i = 0; i < FLAGS_num_alter_table_threads; i++) {
+    threads.emplace_back(&MasterStressTest::AlterTableThread, this);
+  }
+  for (int i = 0; i < FLAGS_num_delete_table_threads; i++) {
+    threads.emplace_back(&MasterStressTest::DeleteTableThread, this);
+  }
+
+  // Let the test run. The main thread will periodically restart masters.
+  RestartMasterLoop();
+
+  // Stop all of the threads.
+  done_.CountDown();
+  table_names_condvar_.Broadcast();
+  int i = 0;
+  for (auto& t : threads) {
+    LOG(INFO) << Substitute("Killing test thread $0/$1", ++i, threads.size());
+    t.join();
+    LOG(INFO) << Substitute("Killed test thread $0/$1", i, threads.size());
+  }
+
+  // Shut down now so that the log messages below are more visible.
+  Shutdown();
+
+  LOG(INFO) << "Tables created: " << num_tables_created_.Load();
+  LOG(INFO) << "Tables altered: " << num_tables_altered_.Load();
+  LOG(INFO) << "Tables deleted: " << num_tables_deleted_.Load();
+  LOG(INFO) << "Masters restarted: " << num_masters_restarted_.Load();
+}
+
+} // namespace kudu