You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/07/04 01:25:33 UTC

kudu git commit: [tools] extra integration tests for the rebalancer

Repository: kudu
Updated Branches:
  refs/heads/master 7b048b8db -> a81d80a79


[tools] extra integration tests for the rebalancer

This patch adds more integration tests for the rebalancer,
providing coverage for the following scenarios:
  * a new tablet server added post-rebalancing,
    and the rebalancer is run again
  * DDL operations are run concurrently with the rebalancing
  * two rebalancers are running concurrently
  * a tablet server goes down during rebalancing
  * a tablet server is added during rebalancing

Change-Id: I78b3dcea71ed303f6ecd199604b2385796d05da8
Reviewed-on: http://gerrit.cloudera.org:8080/10540
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a81d80a7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a81d80a7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a81d80a7

Branch: refs/heads/master
Commit: a81d80a7987cd66c4d0e697058ecd817d2707a9e
Parents: 7b048b8
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue May 29 12:42:43 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Jul 4 01:24:17 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/CMakeLists.txt     |   3 +-
 src/kudu/tools/kudu-admin-test.cc | 654 ++++++++++++++++++++++++++++++---
 2 files changed, 610 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a81d80a7/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 97d8bdb..e367d64 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -164,7 +164,8 @@ set(KUDU_TEST_LINK_LIBS
 ADD_KUDU_TEST(diagnostics_log_parser-test)
 ADD_KUDU_TEST(ksck-test)
 ADD_KUDU_TEST(ksck_remote-test PROCESSORS 3)
-ADD_KUDU_TEST(kudu-admin-test PROCESSORS 3)
+ADD_KUDU_TEST(kudu-admin-test
+  NUM_SHARDS 4 PROCESSORS 3)
 ADD_KUDU_TEST_DEPENDENCIES(kudu-admin-test
   kudu)
 ADD_KUDU_TEST(kudu-tool-test

http://git-wip-us.apache.org/repos/asf/kudu/blob/a81d80a7/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 1868c90..856f1f4 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <algorithm>
+#include <atomic>
 #include <cstdint>
 #include <cstdio>
 #include <deque>
@@ -23,6 +24,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
@@ -46,6 +48,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/integration-tests/test_workload.h"
@@ -55,10 +58,13 @@
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tools/tool_test_util.h"
 #include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -68,7 +74,10 @@ DECLARE_int32(num_tablet_servers);
 
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnSchema;
 using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalTabletServer;
@@ -92,16 +101,22 @@ using kudu::itest::WaitUntilTabletInState;
 using kudu::itest::WaitUntilTabletRunning;
 using kudu::master::VOTER_REPLICA;
 using kudu::pb_util::SecureDebugString;
+using std::atomic;
 using std::back_inserter;
 using std::copy;
 using std::deque;
+using std::ostringstream;
 using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::vector;
 using strings::Split;
 using strings::Substitute;
 
 namespace kudu {
+
+class Schema;
+
 namespace tools {
 
 class AdminCliTest : public tserver::TabletServerIntegrationTestBase {
@@ -1357,6 +1372,55 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
   ASSERT_STR_MATCHES(err, err_msg_pattern);
 }
 
+// Create tables with unbalanced replica distribution: useful in
+// rebalancer-related tests.
+static Status CreateUnbalancedTables(
+    cluster::ExternalMiniCluster* cluster,
+    client::KuduClient* client,
+    const Schema& table_schema,
+    const string& table_name_pattern,
+    int num_tables,
+    int rep_factor,
+    int tserver_idx_from,
+    int tserver_num,
+    int tserver_unresponsive_ms) {
+  // Keep running only some tablet servers and shut down the rest.
+  for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+    cluster->tablet_server(i)->Shutdown();
+  }
+
+  // Wait for the catalog manager to understand that not all tablet servers
+  // are available.
+  SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
+
+  // Create tables with their tablet replicas landing only on the tablet servers
+  // which are up and running.
+  KuduSchema client_schema(client::KuduSchemaFromSchema(table_schema));
+  for (auto i = 0; i < num_tables; ++i) {
+    const string table_name = Substitute(table_name_pattern, i);
+    unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+    RETURN_NOT_OK(table_creator->table_name(table_name)
+                  .schema(&client_schema)
+                  .add_hash_partitions({ "key" }, 3)
+                  .num_replicas(rep_factor)
+                  .Create());
+    RETURN_NOT_OK(RunKuduTool({
+      "perf",
+      "loadgen",
+      cluster->master()->bound_rpc_addr().ToString(),
+      Substitute("--table_name=$0", table_name),
+      Substitute("--table_num_replicas=$0", rep_factor),
+      "--string_fixed=unbalanced_tables_test",
+    }));
+  }
+
+  for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+    RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
+  }
+
+  return Status::OK();
+}
+
 // A test to verify that rebalancing works for both 3-4-3 and 3-2-3 replica
 // management schemes. During replica movement, a light workload is run against
 // every table being rebalanced. This test covers different replication factors.
@@ -1394,33 +1458,9 @@ TEST_P(RebalanceParamTest, Rebalance) {
   FLAGS_num_replicas = kRepFactor;
   NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
 
-  // Keep running only (kRepFactor + 1) tablet servers and shut down the rest.
-  for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
-    cluster_->tablet_server(i)->Shutdown();
-  }
-
-  // Wait for the catalog manager to understand that only (kRepFactor + 1)
-  // tablet servers are available.
-  SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4));
-
-  // Create few tables with their tablet replicas landing only on those
-  // (kRepFactor + 1) running tablet servers.
-  KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
-  for (auto i = 0; i < kNumTables; ++i) {
-    const string table_name = Substitute(table_name_pattern, i);
-    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
-    ASSERT_OK(table_creator->table_name(table_name)
-              .schema(&client_schema)
-              .add_hash_partitions({ "key" }, 3)
-              .num_replicas(kRepFactor)
-              .Create());
-    ASSERT_OK(RunKuduTool({
-      "perf",
-      "loadgen",
-      cluster_->master()->bound_rpc_addr().ToString(),
-      Substitute("--table_name=$0", table_name),
-    }));
-  }
+  ASSERT_OK(CreateUnbalancedTables(
+      cluster_.get(), client_.get(), schema_, table_name_pattern, kNumTables,
+      kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs));
 
   // Workloads aren't run for 3-2-3 replica movement with RF = 1 because
   // the tablet is unavailable during the move until the target voter replica
@@ -1449,21 +1489,20 @@ TEST_P(RebalanceParamTest, Rebalance) {
     }
   }
 
-  for (auto i = (kRepFactor + 1); i < kNumTservers; ++i) {
-    ASSERT_OK(cluster_->tablet_server(i)->Restart());
-  }
+  const vector<string> tool_args = {
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    "--move_single_replicas",
+  };
 
   {
     string out;
     string err;
-    const Status s = RunKuduTool({
-      "cluster",
-      "rebalance",
-      cluster_->master()->bound_rpc_addr().ToString(),
-      "--move_single_replicas",
-    }, &out, &err);
+    const Status s = RunKuduTool(tool_args, &out, &err);
     ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
-    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced");
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+        << "stderr: " << err;
   }
 
   // Next run should report the cluster as balanced and no replica movement
@@ -1471,15 +1510,11 @@ TEST_P(RebalanceParamTest, Rebalance) {
   {
     string out;
     string err;
-    const Status s = RunKuduTool({
-      "cluster",
-      "rebalance",
-      cluster_->master()->bound_rpc_addr().ToString(),
-      "--move_single_replicas",
-    }, &out, &err);
+    const Status s = RunKuduTool(tool_args, &out, &err);
     ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
     ASSERT_STR_CONTAINS(out,
-        "rebalancing is complete: cluster is balanced (moved 0 replicas)");
+        "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+        << "stderr: " << err;
   }
 
   for (auto& workload : workloads) {
@@ -1487,9 +1522,536 @@ TEST_P(RebalanceParamTest, Rebalance) {
   }
 
   NO_FATALS(cluster_->AssertNoCrashes());
+  NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
 
-  ClusterVerifier v(cluster_.get());
-  NO_FATALS(v.CheckCluster());
+  // Now add a new tablet server into the cluster and make sure the rebalancer
+  // will re-distribute replicas.
+  ASSERT_OK(cluster_->AddTabletServer());
+  {
+    string out;
+    string err;
+    const Status s = RunKuduTool(tool_args, &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+        << "stderr: " << err;
+    // The cluster is un-balanced, so many replicas should have been moved.
+    ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)");
+  }
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// Common base for the rebalancer-related test below.
+class RebalancingTest :
+    public tserver::TabletServerIntegrationTestBase,
+    public ::testing::WithParamInterface<Kudu1097> {
+ public:
+  RebalancingTest(int num_tables = 10,
+                  int rep_factor = 3,
+                  int num_tservers = 8,
+                  int tserver_unresponsive_ms = 3000,
+                  const string& table_name_pattern = "rebalance_test_table_$0")
+      : TabletServerIntegrationTestBase(),
+        is_343_scheme_(GetParam() == Kudu1097::Enable),
+        num_tables_(num_tables),
+        rep_factor_(rep_factor),
+        num_tservers_(num_tservers),
+        tserver_unresponsive_ms_(tserver_unresponsive_ms),
+        table_name_pattern_(table_name_pattern) {
+    master_flags_ = {
+      Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_),
+      Substitute("--tserver_unresponsive_timeout_ms=$0", tserver_unresponsive_ms_),
+    };
+    tserver_flags_ = {
+      Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_),
+    };
+  }
+
+ protected:
+  void Prepare(const vector<string>& extra_tserver_flags = {},
+               const vector<string>& extra_master_flags = {}) {
+    copy(extra_tserver_flags.begin(), extra_tserver_flags.end(),
+         back_inserter(tserver_flags_));
+    copy(extra_master_flags.begin(), extra_master_flags.end(),
+         back_inserter(master_flags_));
+
+    FLAGS_num_tablet_servers = num_tservers_;
+    FLAGS_num_replicas = rep_factor_;
+    NO_FATALS(BuildAndStart(tserver_flags_, master_flags_));
+
+    ASSERT_OK(CreateUnbalancedTables(
+        cluster_.get(), client_.get(), schema_, table_name_pattern_,
+        num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
+        tserver_unresponsive_ms_));
+  }
+
+  // When the rebalancer starts moving replicas, ksck detects corruption
+  // (that's why RuntimeError), seeing affected tables as non-healthy
+  // with data state of corresponding tablets as TABLET_DATA_COPYING. If using
+  // this method, it's a good idea to inject some latency into tablet copying
+  // to be able to spot the TABLET_DATA_COPYING state, see the
+  // '--tablet_copy_download_file_inject_latency_ms' flag for tservers.
+  bool IsRebalancingInProgress() {
+    string out;
+    const auto s = RunKuduTool({
+      "cluster",
+      "ksck",
+      cluster_->master()->bound_rpc_addr().ToString(),
+    }, &out);
+    if (s.IsRuntimeError() &&
+        out.find("Data state:  TABLET_DATA_COPYING") != string::npos) {
+      return true;
+    }
+    return false;
+  }
+
+  const bool is_343_scheme_;
+  const int num_tables_;
+  const int rep_factor_;
+  const int num_tservers_;
+  const int tserver_unresponsive_ms_;
+  const string table_name_pattern_;
+  vector<string> tserver_flags_;
+  vector<string> master_flags_;
+};
+
+// Make sure the rebalancer is able to do its job if running concurrently
+// with DDL activity on the cluster.
+class DDLDuringRebalancingTest : public RebalancingTest {
+ public:
+  DDLDuringRebalancingTest()
+      : RebalancingTest(20 /* num_tables */) {
+  }
+};
+INSTANTIATE_TEST_CASE_P(, DDLDuringRebalancingTest,
+                        ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(DDLDuringRebalancingTest, TablesCreatedAndDeletedDuringRebalancing) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  NO_FATALS(Prepare());
+
+  // The latch that controls the lifecycle of the concurrent DDL activity.
+  CountDownLatch run_latch(1);
+
+  thread creator([&]() {
+    KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
+    for (auto idx = 0; ; ++idx) {
+      if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
+        break;
+      }
+      const string table_name = Substitute("rebalancer_extra_table_$0", idx++);
+      unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+      CHECK_OK(table_creator->table_name(table_name)
+               .schema(&client_schema)
+               .add_hash_partitions({ "key" }, 3)
+               .num_replicas(rep_factor_)
+               .Create());
+    }
+  });
+  auto creator_cleanup = MakeScopedCleanup([&]() {
+    run_latch.CountDown();
+    creator.join();
+  });
+
+  thread deleter([&]() {
+    for (auto idx = 0; idx < num_tables_; ++idx) {
+      if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
+        break;
+      }
+      CHECK_OK(client_->DeleteTable(Substitute(table_name_pattern_, idx++)));
+    }
+  });
+  auto deleter_cleanup = MakeScopedCleanup([&]() {
+    run_latch.CountDown();
+    deleter.join();
+  });
+
+  thread alterer([&]() {
+    const string kTableName = "rebalancer_dynamic_table";
+    const string kNewTableName = "rebalancer_dynamic_table_new_name";
+    while (true) {
+      // Create table.
+      {
+        KuduSchema schema;
+        KuduSchemaBuilder builder;
+        builder.AddColumn("key")->Type(KuduColumnSchema::INT64)->
+            NotNull()->
+            PrimaryKey();
+        builder.AddColumn("a")->Type(KuduColumnSchema::INT64);
+        CHECK_OK(builder.Build(&schema));
+        unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+        CHECK_OK(table_creator->table_name(kTableName)
+                 .schema(&schema)
+                 .set_range_partition_columns({})
+                 .num_replicas(rep_factor_)
+                 .Create());
+      }
+      if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+          break;
+      }
+
+      // Drop a column.
+      {
+        unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+        alt->DropColumn("a");
+        CHECK_OK(alt->Alter());
+      }
+      if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+          break;
+      }
+
+      // Add back the column with different type.
+      {
+        unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+        alt->AddColumn("a")->Type(KuduColumnSchema::STRING);
+        CHECK_OK(alt->Alter());
+      }
+      if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+          break;
+      }
+
+      // Rename the table.
+      {
+        unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+        alt->RenameTo(kNewTableName);
+        CHECK_OK(alt->Alter());
+      }
+      if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+          break;
+      }
+
+      // Drop the renamed table.
+      CHECK_OK(client_->DeleteTable(kNewTableName));
+      if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+          break;
+      }
+    }
+  });
+  auto alterer_cleanup = MakeScopedCleanup([&]() {
+    run_latch.CountDown();
+    alterer.join();
+  });
+
+  thread timer([&]() {
+    SleepFor(MonoDelta::FromSeconds(30));
+    run_latch.CountDown();
+  });
+  auto timer_cleanup = MakeScopedCleanup([&]() {
+    timer.join();
+  });
+
+  const vector<string> tool_args = {
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    "--move_single_replicas",
+  };
+
+  // Run the rebalancer concurrently with the DDL operations. The second run
+  // of the rebalancer (the second run starts after joining the timer thread)
+  // is necessary to balance the cluster after the DDL activity stops: that's
+  // the easiest way to make sure the rebalancer will take into account
+  // all DDL changes that happened.
+  //
+  // The signal to terminate the DDL activity (done via run_latch.CountDown())
+  // is sent from a separate timer thread instead of doing SleepFor() after
+  // the first run of the rebalancer followed by run_latch.CountDown().
+  // That's to avoid dependency on the rebalancer behavior if it spots on-going
+  // DDL activity and continues running over and over again.
+  for (auto i = 0; i < 2; ++i) {
+    if (i == 1) {
+      timer.join();
+      timer_cleanup.cancel();
+
+      // Wait for all the DDL activity to complete.
+      alterer.join();
+      alterer_cleanup.cancel();
+
+      deleter.join();
+      deleter_cleanup.cancel();
+
+      creator.join();
+      creator_cleanup.cancel();
+    }
+
+    string out;
+    string err;
+    const auto s = RunKuduTool(tool_args, &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+        << "stderr: " << err;
+  }
+
+  // Next (3rd) run should report the cluster as balanced and
+  // no replica movement should be attempted.
+  {
+    string out;
+    string err;
+    const auto s = RunKuduTool(tool_args, &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out,
+        "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+        << "stderr: " << err;
+  }
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// Make sure it's safe to run multiple rebalancers concurrently. The rebalancers
+// might report errors, but they should not get stuck and the cluster should
+// remain in good shape (i.e. no crashes, no data inconsistencies). Re-running a
+// single rebalancer session again should bring the cluster to a balanced state.
+class ConcurrentRebalancersTest : public RebalancingTest {
+ public:
+  ConcurrentRebalancersTest()
+      : RebalancingTest(10 /* num_tables */) {
+  }
+};
+INSTANTIATE_TEST_CASE_P(, ConcurrentRebalancersTest,
+    ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(ConcurrentRebalancersTest, TwoConcurrentRebalancers) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  NO_FATALS(Prepare());
+
+  const vector<string> tool_args = {
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+  };
+
+  const auto runner_func = [&]() {
+    string err;
+    const auto s = RunKuduTool(tool_args, nullptr, &err);
+
+    ostringstream os;
+    os << "rebalancer status: " << s.ToString();
+    // One might expect a bad status returned: e.g., due to some race so
+    // the rebalancer didn't able to make progress for more than
+    // --max_staleness_interval_sec, etc.
+    if (!s.ok()) {
+      os << " : " << err;
+    }
+    LOG(INFO) << os.str();
+
+    // Should not exit on a signal: not expecting SIGSEGV, SIGABRT, etc.
+    return !MatchPattern(err, "*kudu: process exited on signal*");
+  };
+
+  CountDownLatch start_synchronizer(1);
+  vector<thread> concurrent_runners;
+  for (auto i = 0; i < 5; ++i) {
+    concurrent_runners.emplace_back([&]() {
+      start_synchronizer.Wait();
+      CHECK(runner_func());
+    });
+  }
+
+  // Run rebalancers concurrently and wait for their completion.
+  start_synchronizer.CountDown();
+  for (auto& runner : concurrent_runners) {
+    runner.join();
+  }
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+
+  {
+    string out;
+    string err;
+    // TODO(aserbin): sometimes, when replica movement fails because of
+    //   concurrent rebalancers or other reasons, the REPLACE attribute is left
+    //   in replica's Raft config. In such cases, rebalancing fails because
+    //   it cannot make progress due to the semantics of the ChangeConfig()
+    //   method, returning error
+    //     'Invalid argument: must modify a field when calling MODIFY_PEER'
+    //   in attempt to set REPLACE attribute again.
+    const auto s = RunKuduTool(tool_args, &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+        << "stderr: " << err;
+  }
+
+  // Next run should report the cluster as balanced and no replica movement
+  // should be attempted: at least one run of the rebalancer prior to this
+  // should succeed, so next run is about running the tool against already
+  // balanced cluster.
+  {
+    string out;
+    string err;
+    const auto s = RunKuduTool(tool_args, &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out,
+        "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+        << "stderr: " << err;
+  }
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// The rebalancer should stop and exit upon detecting a tablet server that
+// went down. That's a simple and effective way of preventing concurrent replica
+// movement by the rebalancer and the automatic re-replication (the catalog
+// manager tries to move replicas from the unreachable tablet server).
+class TserverGoesDownDuringRebalancingTest : public RebalancingTest {
+ public:
+  TserverGoesDownDuringRebalancingTest() :
+      RebalancingTest(5 /* num_tables */) {
+  }
+};
+INSTANTIATE_TEST_CASE_P(, TserverGoesDownDuringRebalancingTest,
+    ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(TserverGoesDownDuringRebalancingTest, TserverDown) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const vector<string> kTserverExtraFlags = {
+    // Slow down tablet copy to make rebalancing step running longer
+    // and become observable via tablet data states output by ksck.
+    "--tablet_copy_download_file_inject_latency_ms=1500",
+
+    "--follower_unavailable_considered_failed_sec=30",
+  };
+  NO_FATALS(Prepare(kTserverExtraFlags));
+
+  // Pre-condition: 'kudu cluster ksck' should be happy with the cluster state
+  // shortly after initial setup.
+  ASSERT_EVENTUALLY([&]() {
+    string err;
+    const auto s = RunKuduTool({
+      "cluster",
+      "ksck",
+      cluster_->master()->bound_rpc_addr().ToString(),
+    }, nullptr, &err);
+    ASSERT_TRUE(s.ok()) << "stderr: " << err;
+  });
+
+  Random r(SeedRandom());
+  const uint32_t shutdown_tserver_idx = r.Next() % num_tservers_;
+
+  atomic<bool> run(true);
+  // The thread that shuts down the selected tablet server.
+  thread stopper([&]() {
+    while (run && !IsRebalancingInProgress()) {
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+
+    // All right, it's time to stop the selected tablet server.
+    cluster_->tablet_server(shutdown_tserver_idx)->Shutdown();
+  });
+  auto stopper_cleanup = MakeScopedCleanup([&]() {
+    run = false;
+    stopper.join();
+  });
+
+  {
+    string out;
+    string err;
+    const auto s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      // Limiting the number of replicas to move. This is to make the rebalancer
+      // run longer, making sure the rebalancing is in progress when the tablet
+      // server goes down.
+      "--max_moves_per_server=1",
+    }, &out, &err);
+    ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+    ASSERT_STR_MATCHES(
+        err, "Illegal state: tablet server .* \\(.*\\): "
+             "unacceptable health status UNAVAILABLE");
+
+    // The rebalancer tool should not crash.
+    ASSERT_STR_NOT_CONTAINS(err, "kudu: process exited on signal");
+  }
+
+  run = false;
+  stopper.join();
+  stopper_cleanup.cancel();
+
+  ASSERT_OK(cluster_->tablet_server(shutdown_tserver_idx)->Restart());
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+// The rebalancer should continue working and complete rebalancing successfully
+// if a new tablet server is added while the cluster is being rebalanced.
+class TserverAddedDuringRebalancingTest : public RebalancingTest {
+ public:
+  TserverAddedDuringRebalancingTest()
+      : RebalancingTest(10 /* num_tables */) {
+  }
+};
+INSTANTIATE_TEST_CASE_P(, TserverAddedDuringRebalancingTest,
+    ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const vector<string> kTserverExtraFlags = {
+    // Slow down tablet copy to make rebalancing step running longer
+    // and become observable via tablet data states output by ksck.
+    "--tablet_copy_download_file_inject_latency_ms=1500",
+
+    "--follower_unavailable_considered_failed_sec=30",
+  };
+  NO_FATALS(Prepare(kTserverExtraFlags));
+
+  const vector<string> tool_args = {
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+  };
+
+  atomic<bool> run(true);
+  thread runner([&]() {
+    while (run) {
+      string err;
+      const auto s = RunKuduTool(tool_args, nullptr, &err);
+      CHECK(s.ok()) << s.ToString() << "stderr: " << err;
+    }
+  });
+  auto runner_cleanup = MakeScopedCleanup([&]() {
+    run = false;
+    runner.join();
+  });
+
+  while (!IsRebalancingInProgress()) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // It's time to sneak in and add new tablet server.
+  ASSERT_OK(cluster_->AddTabletServer());
+  run = false;
+  runner.join();
+  runner_cleanup.cancel();
+
+  // The rebalancer should not fail, and eventually, after a new tablet server
+  // is added, the cluster should become balanced.
+  ASSERT_EVENTUALLY([&]() {
+    string out;
+    string err;
+    const auto s = RunKuduTool(tool_args, &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    ASSERT_STR_CONTAINS(out,
+        "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+        << "stderr: " << err;
+  });
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
 }
 
 } // namespace tools