You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/07/04 01:25:33 UTC
kudu git commit: [tools] extra integration tests for the rebalancer
Repository: kudu
Updated Branches:
refs/heads/master 7b048b8db -> a81d80a79
[tools] extra integration tests for the rebalancer
This patch adds more integration tests for the rebalancer,
providing coverage for the following scenarios:
* a new tablet server added post-rebalancing,
and the rebalancer is run again
* DDL operations are run concurrently with the rebalancing
* two rebalancers are running concurrently
* a tablet server goes down during rebalancing
* a tablet server is added during rebalancing
Change-Id: I78b3dcea71ed303f6ecd199604b2385796d05da8
Reviewed-on: http://gerrit.cloudera.org:8080/10540
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a81d80a7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a81d80a7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a81d80a7
Branch: refs/heads/master
Commit: a81d80a7987cd66c4d0e697058ecd817d2707a9e
Parents: 7b048b8
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue May 29 12:42:43 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Jul 4 01:24:17 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/CMakeLists.txt | 3 +-
src/kudu/tools/kudu-admin-test.cc | 654 ++++++++++++++++++++++++++++++---
2 files changed, 610 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/a81d80a7/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 97d8bdb..e367d64 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -164,7 +164,8 @@ set(KUDU_TEST_LINK_LIBS
ADD_KUDU_TEST(diagnostics_log_parser-test)
ADD_KUDU_TEST(ksck-test)
ADD_KUDU_TEST(ksck_remote-test PROCESSORS 3)
-ADD_KUDU_TEST(kudu-admin-test PROCESSORS 3)
+ADD_KUDU_TEST(kudu-admin-test
+ NUM_SHARDS 4 PROCESSORS 3)
ADD_KUDU_TEST_DEPENDENCIES(kudu-admin-test
kudu)
ADD_KUDU_TEST(kudu-tool-test
http://git-wip-us.apache.org/repos/asf/kudu/blob/a81d80a7/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 1868c90..856f1f4 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -16,6 +16,7 @@
// under the License.
#include <algorithm>
+#include <atomic>
#include <cstdint>
#include <cstdio>
#include <deque>
@@ -23,6 +24,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <utility>
@@ -46,6 +48,7 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/test_workload.h"
@@ -55,10 +58,13 @@
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tools/tool_test_util.h"
#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -68,7 +74,10 @@ DECLARE_int32(num_tablet_servers);
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnSchema;
using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalTabletServer;
@@ -92,16 +101,22 @@ using kudu::itest::WaitUntilTabletInState;
using kudu::itest::WaitUntilTabletRunning;
using kudu::master::VOTER_REPLICA;
using kudu::pb_util::SecureDebugString;
+using std::atomic;
using std::back_inserter;
using std::copy;
using std::deque;
+using std::ostringstream;
using std::string;
+using std::thread;
using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;
namespace kudu {
+
+class Schema;
+
namespace tools {
class AdminCliTest : public tserver::TabletServerIntegrationTestBase {
@@ -1357,6 +1372,55 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
ASSERT_STR_MATCHES(err, err_msg_pattern);
}
+// Create tables with unbalanced replica distribution: useful in
+// rebalancer-related tests.
+static Status CreateUnbalancedTables(
+ cluster::ExternalMiniCluster* cluster,
+ client::KuduClient* client,
+ const Schema& table_schema,
+ const string& table_name_pattern,
+ int num_tables,
+ int rep_factor,
+ int tserver_idx_from,
+ int tserver_num,
+ int tserver_unresponsive_ms) {
+ // Keep running only some tablet servers and shut down the rest.
+ for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+ cluster->tablet_server(i)->Shutdown();
+ }
+
+ // Wait for the catalog manager to understand that not all tablet servers
+ // are available.
+ SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
+
+ // Create tables with their tablet replicas landing only on the tablet servers
+ // which are up and running.
+ KuduSchema client_schema(client::KuduSchemaFromSchema(table_schema));
+ for (auto i = 0; i < num_tables; ++i) {
+ const string table_name = Substitute(table_name_pattern, i);
+ unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+ RETURN_NOT_OK(table_creator->table_name(table_name)
+ .schema(&client_schema)
+ .add_hash_partitions({ "key" }, 3)
+ .num_replicas(rep_factor)
+ .Create());
+ RETURN_NOT_OK(RunKuduTool({
+ "perf",
+ "loadgen",
+ cluster->master()->bound_rpc_addr().ToString(),
+ Substitute("--table_name=$0", table_name),
+ Substitute("--table_num_replicas=$0", rep_factor),
+ "--string_fixed=unbalanced_tables_test",
+ }));
+ }
+
+ for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+ RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
+ }
+
+ return Status::OK();
+}
+
// A test to verify that rebalancing works for both 3-4-3 and 3-2-3 replica
// management schemes. During replica movement, a light workload is run against
// every table being rebalanced. This test covers different replication factors.
@@ -1394,33 +1458,9 @@ TEST_P(RebalanceParamTest, Rebalance) {
FLAGS_num_replicas = kRepFactor;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
- // Keep running only (kRepFactor + 1) tablet servers and shut down the rest.
- for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
- cluster_->tablet_server(i)->Shutdown();
- }
-
- // Wait for the catalog manager to understand that only (kRepFactor + 1)
- // tablet servers are available.
- SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4));
-
- // Create few tables with their tablet replicas landing only on those
- // (kRepFactor + 1) running tablet servers.
- KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
- for (auto i = 0; i < kNumTables; ++i) {
- const string table_name = Substitute(table_name_pattern, i);
- unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
- ASSERT_OK(table_creator->table_name(table_name)
- .schema(&client_schema)
- .add_hash_partitions({ "key" }, 3)
- .num_replicas(kRepFactor)
- .Create());
- ASSERT_OK(RunKuduTool({
- "perf",
- "loadgen",
- cluster_->master()->bound_rpc_addr().ToString(),
- Substitute("--table_name=$0", table_name),
- }));
- }
+ ASSERT_OK(CreateUnbalancedTables(
+ cluster_.get(), client_.get(), schema_, table_name_pattern, kNumTables,
+ kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs));
// Workloads aren't run for 3-2-3 replica movement with RF = 1 because
// the tablet is unavailable during the move until the target voter replica
@@ -1449,21 +1489,20 @@ TEST_P(RebalanceParamTest, Rebalance) {
}
}
- for (auto i = (kRepFactor + 1); i < kNumTservers; ++i) {
- ASSERT_OK(cluster_->tablet_server(i)->Restart());
- }
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--move_single_replicas",
+ };
{
string out;
string err;
- const Status s = RunKuduTool({
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- "--move_single_replicas",
- }, &out, &err);
+ const Status s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
- ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced");
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
}
// Next run should report the cluster as balanced and no replica movement
@@ -1471,15 +1510,11 @@ TEST_P(RebalanceParamTest, Rebalance) {
{
string out;
string err;
- const Status s = RunKuduTool({
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- "--move_single_replicas",
- }, &out, &err);
+ const Status s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
ASSERT_STR_CONTAINS(out,
- "rebalancing is complete: cluster is balanced (moved 0 replicas)");
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
}
for (auto& workload : workloads) {
@@ -1487,9 +1522,536 @@ TEST_P(RebalanceParamTest, Rebalance) {
}
NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
- ClusterVerifier v(cluster_.get());
- NO_FATALS(v.CheckCluster());
+ // Now add a new tablet server into the cluster and make sure the rebalancer
+ // will re-distribute replicas.
+ ASSERT_OK(cluster_->AddTabletServer());
+ {
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err;
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ // The cluster is un-balanced, so many replicas should have been moved.
+ ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)");
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// Common base for the rebalancer-related test below.
+class RebalancingTest :
+ public tserver::TabletServerIntegrationTestBase,
+ public ::testing::WithParamInterface<Kudu1097> {
+ public:
+ RebalancingTest(int num_tables = 10,
+ int rep_factor = 3,
+ int num_tservers = 8,
+ int tserver_unresponsive_ms = 3000,
+ const string& table_name_pattern = "rebalance_test_table_$0")
+ : TabletServerIntegrationTestBase(),
+ is_343_scheme_(GetParam() == Kudu1097::Enable),
+ num_tables_(num_tables),
+ rep_factor_(rep_factor),
+ num_tservers_(num_tservers),
+ tserver_unresponsive_ms_(tserver_unresponsive_ms),
+ table_name_pattern_(table_name_pattern) {
+ master_flags_ = {
+ Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_),
+ Substitute("--tserver_unresponsive_timeout_ms=$0", tserver_unresponsive_ms_),
+ };
+ tserver_flags_ = {
+ Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_),
+ };
+ }
+
+ protected:
+ void Prepare(const vector<string>& extra_tserver_flags = {},
+ const vector<string>& extra_master_flags = {}) {
+ copy(extra_tserver_flags.begin(), extra_tserver_flags.end(),
+ back_inserter(tserver_flags_));
+ copy(extra_master_flags.begin(), extra_master_flags.end(),
+ back_inserter(master_flags_));
+
+ FLAGS_num_tablet_servers = num_tservers_;
+ FLAGS_num_replicas = rep_factor_;
+ NO_FATALS(BuildAndStart(tserver_flags_, master_flags_));
+
+ ASSERT_OK(CreateUnbalancedTables(
+ cluster_.get(), client_.get(), schema_, table_name_pattern_,
+ num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
+ tserver_unresponsive_ms_));
+ }
+
+ // When the rebalancer starts moving replicas, ksck detects corruption
+ // (that's why RuntimeError), seeing affected tables as non-healthy
+ // with data state of corresponding tablets as TABLET_DATA_COPYING. If using
+ // this method, it's a good idea to inject some latency into tablet copying
+ // to be able to spot the TABLET_DATA_COPYING state, see the
+ // '--tablet_copy_download_file_inject_latency_ms' flag for tservers.
+ bool IsRebalancingInProgress() {
+ string out;
+ const auto s = RunKuduTool({
+ "cluster",
+ "ksck",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ }, &out);
+ if (s.IsRuntimeError() &&
+ out.find("Data state: TABLET_DATA_COPYING") != string::npos) {
+ return true;
+ }
+ return false;
+ }
+
+ const bool is_343_scheme_;
+ const int num_tables_;
+ const int rep_factor_;
+ const int num_tservers_;
+ const int tserver_unresponsive_ms_;
+ const string table_name_pattern_;
+ vector<string> tserver_flags_;
+ vector<string> master_flags_;
+};
+
+// Make sure the rebalancer is able to do its job if running concurrently
+// with DDL activity on the cluster.
+class DDLDuringRebalancingTest : public RebalancingTest {
+ public:
+ DDLDuringRebalancingTest()
+ : RebalancingTest(20 /* num_tables */) {
+ }
+};
+INSTANTIATE_TEST_CASE_P(, DDLDuringRebalancingTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(DDLDuringRebalancingTest, TablesCreatedAndDeletedDuringRebalancing) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ NO_FATALS(Prepare());
+
+ // The latch that controls the lifecycle of the concurrent DDL activity.
+ CountDownLatch run_latch(1);
+
+ thread creator([&]() {
+ KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
+ for (auto idx = 0; ; ++idx) {
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
+ break;
+ }
+ const string table_name = Substitute("rebalancer_extra_table_$0", idx++);
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ CHECK_OK(table_creator->table_name(table_name)
+ .schema(&client_schema)
+ .add_hash_partitions({ "key" }, 3)
+ .num_replicas(rep_factor_)
+ .Create());
+ }
+ });
+ auto creator_cleanup = MakeScopedCleanup([&]() {
+ run_latch.CountDown();
+ creator.join();
+ });
+
+ thread deleter([&]() {
+ for (auto idx = 0; idx < num_tables_; ++idx) {
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
+ break;
+ }
+ CHECK_OK(client_->DeleteTable(Substitute(table_name_pattern_, idx++)));
+ }
+ });
+ auto deleter_cleanup = MakeScopedCleanup([&]() {
+ run_latch.CountDown();
+ deleter.join();
+ });
+
+ thread alterer([&]() {
+ const string kTableName = "rebalancer_dynamic_table";
+ const string kNewTableName = "rebalancer_dynamic_table_new_name";
+ while (true) {
+ // Create table.
+ {
+ KuduSchema schema;
+ KuduSchemaBuilder builder;
+ builder.AddColumn("key")->Type(KuduColumnSchema::INT64)->
+ NotNull()->
+ PrimaryKey();
+ builder.AddColumn("a")->Type(KuduColumnSchema::INT64);
+ CHECK_OK(builder.Build(&schema));
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ CHECK_OK(table_creator->table_name(kTableName)
+ .schema(&schema)
+ .set_range_partition_columns({})
+ .num_replicas(rep_factor_)
+ .Create());
+ }
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+
+ // Drop a column.
+ {
+ unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+ alt->DropColumn("a");
+ CHECK_OK(alt->Alter());
+ }
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+
+ // Add back the column with different type.
+ {
+ unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+ alt->AddColumn("a")->Type(KuduColumnSchema::STRING);
+ CHECK_OK(alt->Alter());
+ }
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+
+ // Rename the table.
+ {
+ unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+ alt->RenameTo(kNewTableName);
+ CHECK_OK(alt->Alter());
+ }
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+
+ // Drop the renamed table.
+ CHECK_OK(client_->DeleteTable(kNewTableName));
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+ }
+ });
+ auto alterer_cleanup = MakeScopedCleanup([&]() {
+ run_latch.CountDown();
+ alterer.join();
+ });
+
+ thread timer([&]() {
+ SleepFor(MonoDelta::FromSeconds(30));
+ run_latch.CountDown();
+ });
+ auto timer_cleanup = MakeScopedCleanup([&]() {
+ timer.join();
+ });
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--move_single_replicas",
+ };
+
+ // Run the rebalancer concurrently with the DDL operations. The second run
+ // of the rebalancer (the second run starts after joining the timer thread)
+ // is necessary to balance the cluster after the DDL activity stops: that's
+ // the easiest way to make sure the rebalancer will take into account
+ // all DDL changes that happened.
+ //
+ // The signal to terminate the DDL activity (done via run_latch.CountDown())
+ // is sent from a separate timer thread instead of doing SleepFor() after
+ // the first run of the rebalancer followed by run_latch.CountDown().
+ // That's to avoid dependency on the rebalancer behavior if it spots on-going
+ // DDL activity and continues running over and over again.
+ for (auto i = 0; i < 2; ++i) {
+ if (i == 1) {
+ timer.join();
+ timer_cleanup.cancel();
+
+ // Wait for all the DDL activity to complete.
+ alterer.join();
+ alterer_cleanup.cancel();
+
+ deleter.join();
+ deleter_cleanup.cancel();
+
+ creator.join();
+ creator_cleanup.cancel();
+ }
+
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ }
+
+ // Next (3rd) run should report the cluster as balanced and
+ // no replica movement should be attempted.
+ {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// Make sure it's safe to run multiple rebalancers concurrently. The rebalancers
+// might report errors, but they should not get stuck and the cluster should
+// remain in good shape (i.e. no crashes, no data inconsistencies). Re-running a
+// single rebalancer session again should bring the cluster to a balanced state.
+class ConcurrentRebalancersTest : public RebalancingTest {
+ public:
+ ConcurrentRebalancersTest()
+ : RebalancingTest(10 /* num_tables */) {
+ }
+};
+INSTANTIATE_TEST_CASE_P(, ConcurrentRebalancersTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(ConcurrentRebalancersTest, TwoConcurrentRebalancers) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ NO_FATALS(Prepare());
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ };
+
+ const auto runner_func = [&]() {
+ string err;
+ const auto s = RunKuduTool(tool_args, nullptr, &err);
+
+ ostringstream os;
+ os << "rebalancer status: " << s.ToString();
+ // One might expect a bad status returned: e.g., due to some race so
+ // the rebalancer didn't able to make progress for more than
+ // --max_staleness_interval_sec, etc.
+ if (!s.ok()) {
+ os << " : " << err;
+ }
+ LOG(INFO) << os.str();
+
+ // Should not exit on a signal: not expecting SIGSEGV, SIGABRT, etc.
+ return !MatchPattern(err, "*kudu: process exited on signal*");
+ };
+
+ CountDownLatch start_synchronizer(1);
+ vector<thread> concurrent_runners;
+ for (auto i = 0; i < 5; ++i) {
+ concurrent_runners.emplace_back([&]() {
+ start_synchronizer.Wait();
+ CHECK(runner_func());
+ });
+ }
+
+ // Run rebalancers concurrently and wait for their completion.
+ start_synchronizer.CountDown();
+ for (auto& runner : concurrent_runners) {
+ runner.join();
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+
+ {
+ string out;
+ string err;
+ // TODO(aserbin): sometimes, when replica movement fails because of
+ // concurrent rebalancers or other reasons, the REPLACE attribute is left
+ // in replica's Raft config. In such cases, rebalancing fails because
+ // it cannot make progress due to the semantics of the ChangeConfig()
+ // method, returning error
+ // 'Invalid argument: must modify a field when calling MODIFY_PEER'
+ // in attempt to set REPLACE attribute again.
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ }
+
+ // Next run should report the cluster as balanced and no replica movement
+ // should be attempted: at least one run of the rebalancer prior to this
+ // should succeed, so next run is about running the tool against already
+ // balanced cluster.
+ {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// The rebalancer should stop and exit upon detecting a tablet server that
+// went down. That's a simple and effective way of preventing concurrent replica
+// movement by the rebalancer and the automatic re-replication (the catalog
+// manager tries to move replicas from the unreachable tablet server).
+class TserverGoesDownDuringRebalancingTest : public RebalancingTest {
+ public:
+ TserverGoesDownDuringRebalancingTest() :
+ RebalancingTest(5 /* num_tables */) {
+ }
+};
+INSTANTIATE_TEST_CASE_P(, TserverGoesDownDuringRebalancingTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(TserverGoesDownDuringRebalancingTest, TserverDown) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ const vector<string> kTserverExtraFlags = {
+ // Slow down tablet copy to make rebalancing step running longer
+ // and become observable via tablet data states output by ksck.
+ "--tablet_copy_download_file_inject_latency_ms=1500",
+
+ "--follower_unavailable_considered_failed_sec=30",
+ };
+ NO_FATALS(Prepare(kTserverExtraFlags));
+
+ // Pre-condition: 'kudu cluster ksck' should be happy with the cluster state
+ // shortly after initial setup.
+ ASSERT_EVENTUALLY([&]() {
+ string err;
+ const auto s = RunKuduTool({
+ "cluster",
+ "ksck",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ }, nullptr, &err);
+ ASSERT_TRUE(s.ok()) << "stderr: " << err;
+ });
+
+ Random r(SeedRandom());
+ const uint32_t shutdown_tserver_idx = r.Next() % num_tservers_;
+
+ atomic<bool> run(true);
+ // The thread that shuts down the selected tablet server.
+ thread stopper([&]() {
+ while (run && !IsRebalancingInProgress()) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ // All right, it's time to stop the selected tablet server.
+ cluster_->tablet_server(shutdown_tserver_idx)->Shutdown();
+ });
+ auto stopper_cleanup = MakeScopedCleanup([&]() {
+ run = false;
+ stopper.join();
+ });
+
+ {
+ string out;
+ string err;
+ const auto s = RunKuduTool({
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ // Limiting the number of replicas to move. This is to make the rebalancer
+ // run longer, making sure the rebalancing is in progress when the tablet
+ // server goes down.
+ "--max_moves_per_server=1",
+ }, &out, &err);
+ ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+ ASSERT_STR_MATCHES(
+ err, "Illegal state: tablet server .* \\(.*\\): "
+ "unacceptable health status UNAVAILABLE");
+
+ // The rebalancer tool should not crash.
+ ASSERT_STR_NOT_CONTAINS(err, "kudu: process exited on signal");
+ }
+
+ run = false;
+ stopper.join();
+ stopper_cleanup.cancel();
+
+ ASSERT_OK(cluster_->tablet_server(shutdown_tserver_idx)->Restart());
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+// The rebalancer should continue working and complete rebalancing successfully
+// if a new tablet server is added while the cluster is being rebalanced.
+class TserverAddedDuringRebalancingTest : public RebalancingTest {
+ public:
+ TserverAddedDuringRebalancingTest()
+ : RebalancingTest(10 /* num_tables */) {
+ }
+};
+INSTANTIATE_TEST_CASE_P(, TserverAddedDuringRebalancingTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ const vector<string> kTserverExtraFlags = {
+ // Slow down tablet copy to make rebalancing step running longer
+ // and become observable via tablet data states output by ksck.
+ "--tablet_copy_download_file_inject_latency_ms=1500",
+
+ "--follower_unavailable_considered_failed_sec=30",
+ };
+ NO_FATALS(Prepare(kTserverExtraFlags));
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ };
+
+ atomic<bool> run(true);
+ thread runner([&]() {
+ while (run) {
+ string err;
+ const auto s = RunKuduTool(tool_args, nullptr, &err);
+ CHECK(s.ok()) << s.ToString() << "stderr: " << err;
+ }
+ });
+ auto runner_cleanup = MakeScopedCleanup([&]() {
+ run = false;
+ runner.join();
+ });
+
+ while (!IsRebalancingInProgress()) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ // It's time to sneak in and add new tablet server.
+ ASSERT_OK(cluster_->AddTabletServer());
+ run = false;
+ runner.join();
+ runner_cleanup.cancel();
+
+ // The rebalancer should not fail, and eventually, after a new tablet server
+ // is added, the cluster should become balanced.
+ ASSERT_EVENTUALLY([&]() {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ ASSERT_STR_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ });
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
}
} // namespace tools