You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/10/11 17:18:28 UTC
[2/2] kudu git commit: [tools] separated rebalancer tests from
kudu-admin-test
[tools] separated rebalancer tests from kudu-admin-test
This changelist separates rebalancer-related integration tests
from kudu-admin-test into rebalancer_tool-test. It also contains some
other refactoring, such as preparing the RebalancingTest to be used as
non-parameterized base class for 3-4-3-only test scenarios.
Change-Id: Ia9e0ed25f2dae789f2ef56095d239a7e78139e77
Reviewed-on: http://gerrit.cloudera.org:8080/11640
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/66b0a4cc
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/66b0a4cc
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/66b0a4cc
Branch: refs/heads/master
Commit: 66b0a4ccaa714834cd65c0f391a37007f0268b98
Parents: 159c695
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Oct 9 15:54:00 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu Oct 11 17:00:54 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/CMakeLists.txt | 4 +
src/kudu/tools/kudu-admin-test.cc | 1021 +-----------------------
src/kudu/tools/rebalancer_tool-test.cc | 1148 +++++++++++++++++++++++++++
src/kudu/tools/tool_test_util.h | 5 +
4 files changed, 1163 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/66b0a4cc/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index c9c7db3..890e0fd 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -181,4 +181,8 @@ ADD_KUDU_TEST_DEPENDENCIES(kudu-ts-cli-test
kudu)
ADD_KUDU_TEST(rebalance-test)
ADD_KUDU_TEST(rebalance_algo-test)
+ADD_KUDU_TEST(rebalancer_tool-test
+ NUM_SHARDS 8 PROCESSORS 3)
+ADD_KUDU_TEST_DEPENDENCIES(rebalancer_tool-test
+ kudu)
ADD_KUDU_TEST(tool_action-test)
http://git-wip-us.apache.org/repos/asf/kudu/blob/66b0a4cc/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index ff7e84c..7702777 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -19,7 +19,6 @@
#include <atomic>
#include <cstdint>
#include <cstdio>
-#include <cstdlib>
#include <deque>
#include <iterator>
#include <memory>
@@ -42,7 +41,6 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
-#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/quorum_util.h"
@@ -56,23 +54,22 @@
#include "kudu/integration-tests/ts_itest-base.h"
#include "kudu/master/master.pb.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
-#include "kudu/rpc/rpc_controller.h"
#include "kudu/tablet/metadata.pb.h"
-#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/tool_test_util.h"
#include "kudu/tserver/tablet_server-test-base.h"
-#include "kudu/tserver/tserver.pb.h"
-#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
-#include "kudu/util/random.h"
-#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+namespace kudu {
+namespace tserver {
+class ListTabletsResponsePB;
+} // namespace tserver
+} // namespace kudu
+
DECLARE_int32(num_replicas);
DECLARE_int32(num_tablet_servers);
@@ -122,8 +119,6 @@ using strings::Substitute;
namespace kudu {
-class Schema;
-
namespace tools {
// Helper to format info when a tool action fails.
@@ -276,10 +271,6 @@ TEST_F(AdminCliTest, TestChangeConfig) {
MonoDelta::FromSeconds(10)));
}
-enum class Kudu1097 {
- Disable,
- Enable,
-};
enum class DownTS {
None,
TabletPeer,
@@ -1366,1005 +1357,5 @@ TEST_F(AdminCliTest, TestListTablesDetail) {
}
}
-TEST_F(AdminCliTest, RebalancerReportOnly) {
- static const char kReferenceOutput[] =
- R"***(Per-server replica distribution summary:
- Statistic | Value
------------------------+----------
- Minimum Replica Count | 0
- Maximum Replica Count | 1
- Average Replica Count | 0.600000
-
-Per-table replica distribution summary:
- Replica Skew | Value
---------------+----------
- Minimum | 1
- Maximum | 1
- Average | 1.000000)***";
-
- FLAGS_num_tablet_servers = 5;
- NO_FATALS(BuildAndStart());
-
- string out;
- string err;
- Status s = RunKuduTool({
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- "--report_only",
- }, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- // The rebalancer should report on tablet replica distribution. The output
- // should match the reference report: the distribution of the replicas
- // is 100% repeatable given the number of tables created by the test,
- // the replication factor and the number of tablet servers.
- ASSERT_STR_CONTAINS(out, kReferenceOutput);
- // The actual rebalancing should not run.
- ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:")
- << ToolRunInfo(s, out, err);
-}
-
-// Make sure the rebalancer doesn't start if a tablet server is down.
-class RebalanceStartCriteriaTest :
- public AdminCliTest,
- public ::testing::WithParamInterface<Kudu1097> {
-};
-INSTANTIATE_TEST_CASE_P(, RebalanceStartCriteriaTest,
- ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
-TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
- const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
- const vector<string> kMasterFlags = {
- Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
- };
- const vector<string> kTserverFlags = {
- Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
- };
-
- FLAGS_num_tablet_servers = 5;
- NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
-
- // Shutdown one of the tablet servers.
- HostPort ts_host_port;
- {
- auto* ts = cluster_->tablet_server(0);
- ASSERT_NE(nullptr, ts);
- ts_host_port = ts->bound_rpc_hostport();
- ts->Shutdown();
- }
-
- string out;
- string err;
- Status s = RunKuduTool({
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString()
- }, &out, &err);
- ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
- const auto err_msg_pattern = Substitute(
- "Illegal state: tablet server .* \\($0\\): "
- "unacceptable health status UNAVAILABLE",
- ts_host_port.ToString());
- ASSERT_STR_MATCHES(err, err_msg_pattern);
-}
-
-// Create tables with unbalanced replica distribution: useful in
-// rebalancer-related tests.
-static Status CreateUnbalancedTables(
- cluster::ExternalMiniCluster* cluster,
- client::KuduClient* client,
- const Schema& table_schema,
- const string& table_name_pattern,
- int num_tables,
- int rep_factor,
- int tserver_idx_from,
- int tserver_num,
- int tserver_unresponsive_ms) {
- // Keep running only some tablet servers and shut down the rest.
- for (auto i = tserver_idx_from; i < tserver_num; ++i) {
- cluster->tablet_server(i)->Shutdown();
- }
-
- // Wait for the catalog manager to understand that not all tablet servers
- // are available.
- SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
-
- // Create tables with their tablet replicas landing only on the tablet servers
- // which are up and running.
- KuduSchema client_schema(client::KuduSchemaFromSchema(table_schema));
- for (auto i = 0; i < num_tables; ++i) {
- const string table_name = Substitute(table_name_pattern, i);
- unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
- RETURN_NOT_OK(table_creator->table_name(table_name)
- .schema(&client_schema)
- .add_hash_partitions({ "key" }, 3)
- .num_replicas(rep_factor)
- .Create());
- RETURN_NOT_OK(RunKuduTool({
- "perf",
- "loadgen",
- cluster->master()->bound_rpc_addr().ToString(),
- Substitute("--table_name=$0", table_name),
- Substitute("--table_num_replicas=$0", rep_factor),
- "--string_fixed=unbalanced_tables_test",
- }));
- }
-
- for (auto i = tserver_idx_from; i < tserver_num; ++i) {
- RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
- }
-
- return Status::OK();
-}
-
-// A test to verify that rebalancing works for both 3-4-3 and 3-2-3 replica
-// management schemes. During replica movement, a light workload is run against
-// every table being rebalanced. This test covers different replication factors.
-class RebalanceParamTest :
- public AdminCliTest,
- public ::testing::WithParamInterface<tuple<int, Kudu1097>> {
-};
-INSTANTIATE_TEST_CASE_P(, RebalanceParamTest,
- ::testing::Combine(::testing::Values(1, 2, 3, 5),
- ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)));
-TEST_P(RebalanceParamTest, Rebalance) {
- if (!AllowSlowTests()) {
- LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
- return;
- }
-
- const auto& param = GetParam();
- const auto kRepFactor = std::get<0>(param);
- const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable);
- constexpr auto kNumTservers = 7;
- constexpr auto kNumTables = 5;
- const string table_name_pattern = "rebalance_test_table_$0";
- constexpr auto kTserverUnresponsiveMs = 3000;
- const auto timeout = MonoDelta::FromSeconds(30);
- const vector<string> kMasterFlags = {
- "--allow_unsafe_replication_factor",
- Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
- Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
- };
- const vector<string> kTserverFlags = {
- Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
- };
-
- FLAGS_num_tablet_servers = kNumTservers;
- FLAGS_num_replicas = kRepFactor;
- NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
-
- ASSERT_OK(CreateUnbalancedTables(
- cluster_.get(), client_.get(), schema_, table_name_pattern, kNumTables,
- kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs));
-
- // Workloads aren't run for 3-2-3 replica movement with RF = 1 because
- // the tablet is unavailable during the move until the target voter replica
- // is up and running. That might take some time, and to avoid flakiness or
- // setting longer timeouts, RF=1 replicas are moved with no concurrent
- // workload running.
- //
- // TODO(aserbin): clarify why even with 3-4-3 it's a bit flaky now.
- vector<unique_ptr<TestWorkload>> workloads;
- //if (kRepFactor > 1 || is_343_scheme) {
- if (kRepFactor > 1) {
- for (auto i = 0; i < kNumTables; ++i) {
- const string table_name = Substitute(table_name_pattern, i);
- // The workload is light (1 thread, 1 op batches) so that new replicas
- // bootstrap and converge quickly.
- unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
- workload->set_table_name(table_name);
- workload->set_num_replicas(kRepFactor);
- workload->set_num_write_threads(1);
- workload->set_write_batch_size(1);
- workload->set_write_timeout_millis(timeout.ToMilliseconds());
- workload->set_already_present_allowed(true);
- workload->Setup();
- workload->Start();
- workloads.emplace_back(std::move(workload));
- }
- }
-
- const vector<string> tool_args = {
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- "--move_single_replicas=enabled",
- };
-
- {
- string out;
- string err;
- const Status s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
- << "stderr: " << err;
- }
-
- // Next run should report the cluster as balanced and no replica movement
- // should be attempted.
- {
- string out;
- string err;
- const Status s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out,
- "rebalancing is complete: cluster is balanced (moved 0 replicas)")
- << "stderr: " << err;
- }
-
- for (auto& workload : workloads) {
- workload->StopAndJoin();
- }
-
- NO_FATALS(cluster_->AssertNoCrashes());
- NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
-
- // Now add a new tablet server into the cluster and make sure the rebalancer
- // will re-distribute replicas.
- ASSERT_OK(cluster_->AddTabletServer());
- {
- string out;
- string err;
- const Status s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
- << "stderr: " << err;
- // The cluster was un-balanced, so many replicas should have been moved.
- ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)");
- }
-
- NO_FATALS(cluster_->AssertNoCrashes());
- NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
-}
-
-// Common base for the rebalancer-related test below.
-class RebalancingTest :
- public tserver::TabletServerIntegrationTestBase,
- public ::testing::WithParamInterface<Kudu1097> {
- public:
- RebalancingTest(int num_tables = 10,
- int rep_factor = 3,
- int num_tservers = 8,
- int tserver_unresponsive_ms = 3000,
- const string& table_name_pattern = "rebalance_test_table_$0")
- : TabletServerIntegrationTestBase(),
- is_343_scheme_(GetParam() == Kudu1097::Enable),
- num_tables_(num_tables),
- rep_factor_(rep_factor),
- num_tservers_(num_tservers),
- tserver_unresponsive_ms_(tserver_unresponsive_ms),
- table_name_pattern_(table_name_pattern) {
- master_flags_ = {
- Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_),
- Substitute("--tserver_unresponsive_timeout_ms=$0", tserver_unresponsive_ms_),
- };
- tserver_flags_ = {
- Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_),
- };
- }
-
- protected:
- static const char* const kExitOnSignalStr;
-
- void Prepare(const vector<string>& extra_tserver_flags = {},
- const vector<string>& extra_master_flags = {}) {
- copy(extra_tserver_flags.begin(), extra_tserver_flags.end(),
- back_inserter(tserver_flags_));
- copy(extra_master_flags.begin(), extra_master_flags.end(),
- back_inserter(master_flags_));
-
- FLAGS_num_tablet_servers = num_tservers_;
- FLAGS_num_replicas = rep_factor_;
- NO_FATALS(BuildAndStart(tserver_flags_, master_flags_));
-
- ASSERT_OK(CreateUnbalancedTables(
- cluster_.get(), client_.get(), schema_, table_name_pattern_,
- num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
- tserver_unresponsive_ms_));
- }
-
- // When the rebalancer starts moving replicas, ksck detects corruption
- // (that's why RuntimeError), seeing affected tables as non-healthy
- // with data state of corresponding tablets as TABLET_DATA_COPYING. If using
- // this method, it's a good idea to inject some latency into tablet copying
- // to be able to spot the TABLET_DATA_COPYING state, see the
- // '--tablet_copy_download_file_inject_latency_ms' flag for tservers.
- bool IsRebalancingInProgress() {
- string out;
- const auto s = RunKuduTool({
- "cluster",
- "ksck",
- cluster_->master()->bound_rpc_addr().ToString(),
- }, &out);
- if (s.IsRuntimeError() &&
- out.find("Data state: TABLET_DATA_COPYING") != string::npos) {
- return true;
- }
- return false;
- }
-
- const bool is_343_scheme_;
- const int num_tables_;
- const int rep_factor_;
- const int num_tservers_;
- const int tserver_unresponsive_ms_;
- const string table_name_pattern_;
- vector<string> tserver_flags_;
- vector<string> master_flags_;
-};
-const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal";
-
-// Make sure the rebalancer is able to do its job if running concurrently
-// with DDL activity on the cluster.
-class DDLDuringRebalancingTest : public RebalancingTest {
- public:
- DDLDuringRebalancingTest()
- : RebalancingTest(20 /* num_tables */) {
- }
-};
-INSTANTIATE_TEST_CASE_P(, DDLDuringRebalancingTest,
- ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
-TEST_P(DDLDuringRebalancingTest, TablesCreatedAndDeletedDuringRebalancing) {
- if (!AllowSlowTests()) {
- LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
- return;
- }
-
- NO_FATALS(Prepare());
-
- // The latch that controls the lifecycle of the concurrent DDL activity.
- CountDownLatch run_latch(1);
-
- thread creator([&]() {
- KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
- for (auto idx = 0; ; ++idx) {
- if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
- break;
- }
- const string table_name = Substitute("rebalancer_extra_table_$0", idx++);
- unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
- CHECK_OK(table_creator->table_name(table_name)
- .schema(&client_schema)
- .add_hash_partitions({ "key" }, 3)
- .num_replicas(rep_factor_)
- .Create());
- }
- });
- auto creator_cleanup = MakeScopedCleanup([&]() {
- run_latch.CountDown();
- creator.join();
- });
-
- thread deleter([&]() {
- for (auto idx = 0; idx < num_tables_; ++idx) {
- if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
- break;
- }
- CHECK_OK(client_->DeleteTable(Substitute(table_name_pattern_, idx++)));
- }
- });
- auto deleter_cleanup = MakeScopedCleanup([&]() {
- run_latch.CountDown();
- deleter.join();
- });
-
- thread alterer([&]() {
- const string kTableName = "rebalancer_dynamic_table";
- const string kNewTableName = "rebalancer_dynamic_table_new_name";
- while (true) {
- // Create table.
- {
- KuduSchema schema;
- KuduSchemaBuilder builder;
- builder.AddColumn("key")->Type(KuduColumnSchema::INT64)->
- NotNull()->
- PrimaryKey();
- builder.AddColumn("a")->Type(KuduColumnSchema::INT64);
- CHECK_OK(builder.Build(&schema));
- unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
- CHECK_OK(table_creator->table_name(kTableName)
- .schema(&schema)
- .set_range_partition_columns({})
- .num_replicas(rep_factor_)
- .Create());
- }
- if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
- break;
- }
-
- // Drop a column.
- {
- unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
- alt->DropColumn("a");
- CHECK_OK(alt->Alter());
- }
- if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
- break;
- }
-
- // Add back the column with different type.
- {
- unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
- alt->AddColumn("a")->Type(KuduColumnSchema::STRING);
- CHECK_OK(alt->Alter());
- }
- if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
- break;
- }
-
- // Rename the table.
- {
- unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
- alt->RenameTo(kNewTableName);
- CHECK_OK(alt->Alter());
- }
- if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
- break;
- }
-
- // Drop the renamed table.
- CHECK_OK(client_->DeleteTable(kNewTableName));
- if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
- break;
- }
- }
- });
- auto alterer_cleanup = MakeScopedCleanup([&]() {
- run_latch.CountDown();
- alterer.join();
- });
-
- thread timer([&]() {
- SleepFor(MonoDelta::FromSeconds(30));
- run_latch.CountDown();
- });
- auto timer_cleanup = MakeScopedCleanup([&]() {
- timer.join();
- });
-
- const vector<string> tool_args = {
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- };
-
- // Run the rebalancer concurrently with the DDL operations. The second run
- // of the rebalancer (the second run starts after joining the timer thread)
- // is necessary to balance the cluster after the DDL activity stops: that's
- // the easiest way to make sure the rebalancer will take into account
- // all DDL changes that happened.
- //
- // The signal to terminate the DDL activity (done via run_latch.CountDown())
- // is sent from a separate timer thread instead of doing SleepFor() after
- // the first run of the rebalancer followed by run_latch.CountDown().
- // That's to avoid dependency on the rebalancer behavior if it spots on-going
- // DDL activity and continues running over and over again.
- for (auto i = 0; i < 2; ++i) {
- if (i == 1) {
- timer.join();
- timer_cleanup.cancel();
-
- // Wait for all the DDL activity to complete.
- alterer.join();
- alterer_cleanup.cancel();
-
- deleter.join();
- deleter_cleanup.cancel();
-
- creator.join();
- creator_cleanup.cancel();
- }
-
- string out;
- string err;
- const auto s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
- << "stderr: " << err;
- }
-
- // Next (3rd) run should report the cluster as balanced and
- // no replica movement should be attempted.
- {
- string out;
- string err;
- const auto s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out,
- "rebalancing is complete: cluster is balanced (moved 0 replicas)")
- << "stderr: " << err;
- }
-
- NO_FATALS(cluster_->AssertNoCrashes());
- NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
-}
-
-// Make sure it's safe to run multiple rebalancers concurrently. The rebalancers
-// might report errors, but they should not get stuck and the cluster should
-// remain in good shape (i.e. no crashes, no data inconsistencies). Re-running a
-// single rebalancer session again should bring the cluster to a balanced state.
-class ConcurrentRebalancersTest : public RebalancingTest {
- public:
- ConcurrentRebalancersTest()
- : RebalancingTest(10 /* num_tables */) {
- }
-};
-INSTANTIATE_TEST_CASE_P(, ConcurrentRebalancersTest,
- ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
-TEST_P(ConcurrentRebalancersTest, TwoConcurrentRebalancers) {
- if (!AllowSlowTests()) {
- LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
- return;
- }
-
- NO_FATALS(Prepare());
-
- const vector<string> tool_args = {
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- };
-
- const auto runner_func = [&]() {
- string out;
- string err;
- const auto s = RunKuduTool(tool_args, &out, &err);
- if (!s.ok()) {
- // One might expect a bad status returned: e.g., due to some race so
- // the rebalancer didn't able to make progress for more than
- // --max_staleness_interval_sec, etc.
- LOG(INFO) << "rebalancer run info: " << ToolRunInfo(s, out, err);
- }
-
- // Should not exit on a signal: not expecting SIGSEGV, SIGABRT, etc.
- return s.ToString().find(kExitOnSignalStr) == string::npos;
- };
-
- CountDownLatch start_synchronizer(1);
- vector<thread> concurrent_runners;
- for (auto i = 0; i < 5; ++i) {
- concurrent_runners.emplace_back([&]() {
- start_synchronizer.Wait();
- CHECK(runner_func());
- });
- }
-
- // Run rebalancers concurrently and wait for their completion.
- start_synchronizer.CountDown();
- for (auto& runner : concurrent_runners) {
- runner.join();
- }
-
- NO_FATALS(cluster_->AssertNoCrashes());
- NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
-
- {
- string out;
- string err;
- const auto s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
- << "stderr: " << err;
- }
-
- // Next run should report the cluster as balanced and no replica movement
- // should be attempted: at least one run of the rebalancer prior to this
- // should succeed, so next run is about running the tool against already
- // balanced cluster.
- {
- string out;
- string err;
- const auto s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out,
- "rebalancing is complete: cluster is balanced (moved 0 replicas)")
- << "stderr: " << err;
- }
-
- NO_FATALS(cluster_->AssertNoCrashes());
- NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
-}
-
-// The rebalancer should stop and exit upon detecting a tablet server that
-// went down. That's a simple and effective way of preventing concurrent replica
-// movement by the rebalancer and the automatic re-replication (the catalog
-// manager tries to move replicas from the unreachable tablet server).
-class TserverGoesDownDuringRebalancingTest : public RebalancingTest {
- public:
- TserverGoesDownDuringRebalancingTest() :
- RebalancingTest(5 /* num_tables */) {
- }
-};
-INSTANTIATE_TEST_CASE_P(, TserverGoesDownDuringRebalancingTest,
- ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
-TEST_P(TserverGoesDownDuringRebalancingTest, TserverDown) {
- if (!AllowSlowTests()) {
- LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
- return;
- }
-
- const vector<string> kTserverExtraFlags = {
- // Slow down tablet copy to make rebalancing step running longer
- // and become observable via tablet data states output by ksck.
- "--tablet_copy_download_file_inject_latency_ms=1500",
-
- "--follower_unavailable_considered_failed_sec=30",
- };
- NO_FATALS(Prepare(kTserverExtraFlags));
-
- // Pre-condition: 'kudu cluster ksck' should be happy with the cluster state
- // shortly after initial setup.
- ASSERT_EVENTUALLY([&]() {
- ASSERT_TOOL_OK(
- "cluster",
- "ksck",
- cluster_->master()->bound_rpc_addr().ToString()
- )
- });
-
- Random r(SeedRandom());
- const uint32_t shutdown_tserver_idx = r.Next() % num_tservers_;
-
- atomic<bool> run(true);
- // The thread that shuts down the selected tablet server.
- thread stopper([&]() {
- while (run && !IsRebalancingInProgress()) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
-
- // All right, it's time to stop the selected tablet server.
- cluster_->tablet_server(shutdown_tserver_idx)->Shutdown();
- });
- auto stopper_cleanup = MakeScopedCleanup([&]() {
- run = false;
- stopper.join();
- });
-
- {
- string out;
- string err;
- const auto s = RunKuduTool({
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- // Limiting the number of replicas to move. This is to make the rebalancer
- // run longer, making sure the rebalancing is in progress when the tablet
- // server goes down.
- "--max_moves_per_server=1",
- }, &out, &err);
- ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
-
- // The rebalancer tool should not crash.
- ASSERT_STR_NOT_CONTAINS(s.ToString(), kExitOnSignalStr);
- ASSERT_STR_MATCHES(
- err, "Illegal state: tablet server .* \\(.*\\): "
- "unacceptable health status UNAVAILABLE");
- }
-
- run = false;
- stopper.join();
- stopper_cleanup.cancel();
-
- ASSERT_OK(cluster_->tablet_server(shutdown_tserver_idx)->Restart());
- NO_FATALS(cluster_->AssertNoCrashes());
-}
-
-// The rebalancer should continue working and complete rebalancing successfully
-// if a new tablet server is added while the cluster is being rebalanced.
-class TserverAddedDuringRebalancingTest : public RebalancingTest {
- public:
- TserverAddedDuringRebalancingTest()
- : RebalancingTest(10 /* num_tables */) {
- }
-};
-INSTANTIATE_TEST_CASE_P(, TserverAddedDuringRebalancingTest,
- ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
-TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) {
- if (!AllowSlowTests()) {
- LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
- return;
- }
-
- const vector<string> kTserverExtraFlags = {
- // Slow down tablet copy to make rebalancing step running longer
- // and become observable via tablet data states output by ksck.
- "--tablet_copy_download_file_inject_latency_ms=1500",
-
- "--follower_unavailable_considered_failed_sec=30",
- };
- NO_FATALS(Prepare(kTserverExtraFlags));
-
- const vector<string> tool_args = {
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- };
-
- atomic<bool> run(true);
- thread runner([&]() {
- while (run) {
- string out;
- string err;
- const auto s = RunKuduTool(tool_args, &out, &err);
- CHECK(s.ok()) << ToolRunInfo(s, out, err);
- }
- });
- auto runner_cleanup = MakeScopedCleanup([&]() {
- run = false;
- runner.join();
- });
-
- while (!IsRebalancingInProgress()) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
-
- // It's time to sneak in and add new tablet server.
- ASSERT_OK(cluster_->AddTabletServer());
- run = false;
- runner.join();
- runner_cleanup.cancel();
-
- // The rebalancer should not fail, and eventually, after a new tablet server
- // is added, the cluster should become balanced.
- ASSERT_EVENTUALLY([&]() {
- string out;
- string err;
- const auto s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out,
- "rebalancing is complete: cluster is balanced (moved 0 replicas)")
- << "stderr: " << err;
- });
-
- NO_FATALS(cluster_->AssertNoCrashes());
- NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
-}
-
-// Run rebalancer in 'election storms' environment and make sure the rebalancer
-// does not exit prematurely or exhibit any other unexpected behavior.
-class RebalancingDuringElectionStormTest : public RebalancingTest {
-};
-INSTANTIATE_TEST_CASE_P(, RebalancingDuringElectionStormTest,
- ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
-TEST_P(RebalancingDuringElectionStormTest, RoundRobin) {
- if (!AllowSlowTests()) {
- LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
- return;
- }
-
- NO_FATALS(Prepare());
-
- atomic<bool> elector_run(true);
-#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
- // The timeout is a time-to-run for the stormy elector thread as well.
- // Making longer timeout for workload in case of TSAN/ASAN is not needed:
- // having everything generated written is not required.
- const auto timeout = MonoDelta::FromSeconds(5);
-#else
- const auto timeout = MonoDelta::FromSeconds(10);
-#endif
- const auto start_time = MonoTime::Now();
- thread elector([&]() {
- // Mininum viable divider for modulo ('%') to allow the result to grow by
- // the rules below.
- auto max_sleep_ms = 2.0;
- while (elector_run && MonoTime::Now() < start_time + timeout) {
- for (const auto& e : tablet_servers_) {
- const auto& ts_uuid = e.first;
- const auto* ts = e.second;
- vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
- auto const s = itest::ListTablets(ts, timeout, &tablets);
- if (!s.ok()) {
- LOG(WARNING) << ts_uuid << ": failed to get tablet list :"
- << s.ToString();
- continue;
- }
- consensus::ConsensusServiceProxy proxy(
- cluster_->messenger(),
- cluster_->tablet_server_by_uuid(ts_uuid)->bound_rpc_addr(),
- "tserver " + ts_uuid);
- for (const auto& tablet : tablets) {
- const auto& tablet_id = tablet.tablet_status().tablet_id();
- consensus::RunLeaderElectionRequestPB req;
- req.set_tablet_id(tablet_id);
- req.set_dest_uuid(ts_uuid);
- rpc::RpcController rpc;
- rpc.set_timeout(timeout);
- consensus::RunLeaderElectionResponsePB resp;
- WARN_NOT_OK(proxy.RunLeaderElection(req, &resp, &rpc),
- Substitute("failed to start election for tablet $0",
- tablet_id));
- }
- if (!elector_run || start_time + timeout <= MonoTime::Now()) {
- break;
- }
- auto sleep_ms = rand() % static_cast<int>(max_sleep_ms);
- SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
- max_sleep_ms = std::min(max_sleep_ms * 1.1, 2000.0);
- }
- }
- });
- auto elector_cleanup = MakeScopedCleanup([&]() {
- elector_run = false;
- elector.join();
- });
-
-#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
- vector<unique_ptr<TestWorkload>> workloads;
- for (auto i = 0; i < num_tables_; ++i) {
- const string table_name = Substitute(table_name_pattern_, i);
- // The workload is light (1 thread, 1 op batches) and lenient to failures.
- unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
- workload->set_table_name(table_name);
- workload->set_num_replicas(rep_factor_);
- workload->set_num_write_threads(1);
- workload->set_write_batch_size(1);
- workload->set_write_timeout_millis(timeout.ToMilliseconds());
- workload->set_already_present_allowed(true);
- workload->set_remote_error_allowed(true);
- workload->set_timeout_allowed(true);
- workload->Setup();
- workload->Start();
- workloads.emplace_back(std::move(workload));
- }
-#endif
-
- const vector<string> tool_args = {
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- };
-
- while (MonoTime::Now() < start_time + timeout) {
- // Rebalancer should not report any errors even if it's an election storm
- // unless a tablet server is reported as unavailable by ksck: the latter
- // usually happens because GetConsensusState requests are dropped due to
- // backpressure.
- string out;
- string err;
- const Status s = RunKuduTool(tool_args, &out, &err);
- if (s.ok()) {
- ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
- << ToolRunInfo(s, out, err);
- } else {
- ASSERT_STR_CONTAINS(err, "unacceptable health status UNAVAILABLE")
- << ToolRunInfo(s, out, err);
- }
- }
-
- elector_run = false;
- elector.join();
- elector_cleanup.cancel();
-#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
- for (auto& workload : workloads) {
- workload->StopAndJoin();
- }
-#endif
-
- // There might be some re-replication started as a result of election storm,
- // etc. Eventually, the system should heal itself and 'kudu cluster ksck'
- // should report no issues.
- ASSERT_EVENTUALLY([&]() {
- ASSERT_TOOL_OK(
- "cluster",
- "ksck",
- cluster_->master()->bound_rpc_addr().ToString()
- )
- });
-
- // The rebalancer should successfully rebalance the cluster after ksck
- // reported 'all is well'.
- {
- string out;
- string err;
- const Status s = RunKuduTool(tool_args, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
- << ToolRunInfo(s, out, err);
- }
-
- NO_FATALS(cluster_->AssertNoCrashes());
-}
-
-// A test to verify how the rebalancer handles replicas of single-replica
-// tablets in case of various values of the '--move_single_replicas' flag
-// and replica management schemes.
-class RebalancerAndSingleReplicaTablets :
- public AdminCliTest,
- public ::testing::WithParamInterface<tuple<string, Kudu1097>> {
-};
-INSTANTIATE_TEST_CASE_P(, RebalancerAndSingleReplicaTablets,
- ::testing::Combine(::testing::Values("auto", "enabled", "disabled"),
- ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)));
-TEST_P(RebalancerAndSingleReplicaTablets, SingleReplicasStayOrMove) {
- if (!AllowSlowTests()) {
- LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
- return;
- }
-
- constexpr auto kRepFactor = 1;
- constexpr auto kNumTservers = 2 * (kRepFactor + 1);
- constexpr auto kNumTables = kNumTservers;
- constexpr auto kTserverUnresponsiveMs = 3000;
- const auto& param = GetParam();
- const auto& move_single_replica = std::get<0>(param);
- const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable);
- const string table_name_pattern = "rebalance_test_table_$0";
- const vector<string> master_flags = {
- Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
- Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
- };
- const vector<string> tserver_flags = {
- Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
- };
-
- FLAGS_num_tablet_servers = kNumTservers;
- FLAGS_num_replicas = kRepFactor;
- NO_FATALS(BuildAndStart(tserver_flags, master_flags));
-
- // Keep running only (kRepFactor + 1) tablet servers and shut down the rest.
- for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
- cluster_->tablet_server(i)->Shutdown();
- }
-
- // Wait for the catalog manager to understand that only (kRepFactor + 1)
- // tablet servers are available.
- SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4));
-
- // Create few tables with their tablet replicas landing only on those
- // (kRepFactor + 1) running tablet servers.
- KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
- for (auto i = 0; i < kNumTables; ++i) {
- const string table_name = Substitute(table_name_pattern, i);
- unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
- ASSERT_OK(table_creator->table_name(table_name)
- .schema(&client_schema)
- .add_hash_partitions({ "key" }, 3)
- .num_replicas(kRepFactor)
- .Create());
- ASSERT_TOOL_OK(
- "perf",
- "loadgen",
- cluster_->master()->bound_rpc_addr().ToString(),
- Substitute("--table_name=$0", table_name),
- // Don't need much data in there.
- "--num_threads=1",
- "--num_rows_per_thread=1",
- );
- }
- for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
- ASSERT_OK(cluster_->tablet_server(i)->Restart());
- }
-
- string out;
- string err;
- const Status s = RunKuduTool({
- "cluster",
- "rebalance",
- cluster_->master()->bound_rpc_addr().ToString(),
- Substitute("--move_single_replicas=$0", move_single_replica),
- }, &out, &err);
- ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
- ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
- << "stderr: " << err;
- if (move_single_replica == "enabled" ||
- (move_single_replica == "auto" && is_343_scheme)) {
- // Should move appropriate replicas of single-replica tablets.
- ASSERT_STR_NOT_CONTAINS(out,
- "rebalancing is complete: cluster is balanced (moved 0 replicas)")
- << "stderr: " << err;
- ASSERT_STR_NOT_CONTAINS(err, "has single replica, skipping");
- } else {
- ASSERT_STR_CONTAINS(out,
- "rebalancing is complete: cluster is balanced (moved 0 replicas)")
- << "stderr: " << err;
- ASSERT_STR_MATCHES(err, "tablet .* of table '.*' (.*) has single replica, skipping");
- }
-
- NO_FATALS(cluster_->AssertNoCrashes());
- ClusterVerifier v(cluster_.get());
- NO_FATALS(v.CheckCluster());
-}
-
} // namespace tools
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/66b0a4cc/src/kudu/tools/rebalancer_tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
new file mode 100644
index 0000000..3e6c8d1
--- /dev/null
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -0,0 +1,1148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <atomic>
+#include <cstdint>
+#include <cstdlib>
+#include <iterator>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+class Schema;
+} // namespace kudu
+
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+
+using kudu::client::KuduClient;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTableAlterer;
+using kudu::client::KuduTableCreator;
+using kudu::itest::TabletServerMap;
+using kudu::tserver::ListTabletsResponsePB;
+using std::atomic;
+using std::back_inserter;
+using std::copy;
+using std::endl;
+using std::ostringstream;
+using std::string;
+using std::thread;
+using std::tuple;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace tools {
+
+ // Helper to format info when a tool action fails.
+static string ToolRunInfo(const Status& s, const string& out, const string& err) {
+ ostringstream str;
+ str << s.ToString() << endl;
+ str << "stdout: " << out << endl;
+ str << "stderr: " << err << endl;
+ return str.str();
+}
+
+// Helper macro for tool tests. Use as follows:
+//
+// ASSERT_TOOL_OK("cluster", "ksck", master_addrs);
+//
+// The failure Status result of RunKuduTool is usually useless, so this macro
+// also logs the stdout and stderr in case of failure, for easier diagnosis.
+// TODO(wdberkeley): Add a macro to retrieve stdout or stderr, or a macro for
+// when one of those should match a string.
+#define ASSERT_TOOL_OK(...) do { \
+ const vector<string>& _args{__VA_ARGS__}; \
+ string _out, _err; \
+ const Status& _s = RunKuduTool(_args, &_out, &_err); \
+ if (_s.ok()) { \
+ SUCCEED(); \
+ } else { \
+ FAIL() << ToolRunInfo(_s, _out, _err); \
+ } \
+} while (0);
+
+class AdminCliTest : public tserver::TabletServerIntegrationTestBase {
+};
+
+TEST_F(AdminCliTest, RebalancerReportOnly) {
+ static const char kReferenceOutput[] =
+ R"***(Per-server replica distribution summary:
+ Statistic | Value
+-----------------------+----------
+ Minimum Replica Count | 0
+ Maximum Replica Count | 1
+ Average Replica Count | 0.600000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+----------
+ Minimum | 1
+ Maximum | 1
+ Average | 1.000000)***";
+
+ FLAGS_num_tablet_servers = 5;
+ NO_FATALS(BuildAndStart());
+
+ string out;
+ string err;
+ Status s = RunKuduTool({
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--report_only",
+ }, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ // The rebalancer should report on tablet replica distribution. The output
+ // should match the reference report: the distribution of the replicas
+ // is 100% repeatable given the number of tables created by the test,
+ // the replication factor and the number of tablet servers.
+ ASSERT_STR_CONTAINS(out, kReferenceOutput);
+ // The actual rebalancing should not run.
+ ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:")
+ << ToolRunInfo(s, out, err);
+}
+
+// Make sure the rebalancer doesn't start if a tablet server is down.
+class RebalanceStartCriteriaTest :
+ public AdminCliTest,
+ public ::testing::WithParamInterface<Kudu1097> {
+};
+INSTANTIATE_TEST_CASE_P(, RebalanceStartCriteriaTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
+ const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
+ const vector<string> kMasterFlags = {
+ Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+ };
+ const vector<string> kTserverFlags = {
+ Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+ };
+
+ FLAGS_num_tablet_servers = 5;
+ NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+
+ // Shutdown one of the tablet servers.
+ HostPort ts_host_port;
+ {
+ auto* ts = cluster_->tablet_server(0);
+ ASSERT_NE(nullptr, ts);
+ ts_host_port = ts->bound_rpc_hostport();
+ ts->Shutdown();
+ }
+
+ string out;
+ string err;
+ Status s = RunKuduTool({
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString()
+ }, &out, &err);
+ ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
+ const auto err_msg_pattern = Substitute(
+ "Illegal state: tablet server .* \\($0\\): "
+ "unacceptable health status UNAVAILABLE",
+ ts_host_port.ToString());
+ ASSERT_STR_MATCHES(err, err_msg_pattern);
+}
+
+// Create tables with unbalanced replica distribution: useful in
+// rebalancer-related tests.
+static Status CreateUnbalancedTables(
+ cluster::ExternalMiniCluster* cluster,
+ client::KuduClient* client,
+ const Schema& table_schema,
+ const string& table_name_pattern,
+ int num_tables,
+ int rep_factor,
+ int tserver_idx_from,
+ int tserver_num,
+ int tserver_unresponsive_ms) {
+ // Keep running only some tablet servers and shut down the rest.
+ for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+ cluster->tablet_server(i)->Shutdown();
+ }
+
+ // Wait for the catalog manager to understand that not all tablet servers
+ // are available.
+ SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
+
+ // Create tables with their tablet replicas landing only on the tablet servers
+ // which are up and running.
+ KuduSchema client_schema(client::KuduSchemaFromSchema(table_schema));
+ for (auto i = 0; i < num_tables; ++i) {
+ const string table_name = Substitute(table_name_pattern, i);
+ unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+ RETURN_NOT_OK(table_creator->table_name(table_name)
+ .schema(&client_schema)
+ .add_hash_partitions({ "key" }, 3)
+ .num_replicas(rep_factor)
+ .Create());
+ RETURN_NOT_OK(RunKuduTool({
+ "perf",
+ "loadgen",
+ cluster->master()->bound_rpc_addr().ToString(),
+ Substitute("--table_name=$0", table_name),
+ Substitute("--table_num_replicas=$0", rep_factor),
+ "--string_fixed=unbalanced_tables_test",
+ }));
+ }
+
+ for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+ RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
+ }
+
+ return Status::OK();
+}
+
+// A test to verify that rebalancing works for both 3-4-3 and 3-2-3 replica
+// management schemes. During replica movement, a light workload is run against
+// every table being rebalanced. This test covers different replication factors.
+class RebalanceParamTest :
+ public AdminCliTest,
+ public ::testing::WithParamInterface<tuple<int, Kudu1097>> {
+};
+INSTANTIATE_TEST_CASE_P(, RebalanceParamTest,
+ ::testing::Combine(::testing::Values(1, 2, 3, 5),
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)));
+TEST_P(RebalanceParamTest, Rebalance) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ const auto& param = GetParam();
+ const auto kRepFactor = std::get<0>(param);
+ const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable);
+ constexpr auto kNumTservers = 7;
+ constexpr auto kNumTables = 5;
+ const string table_name_pattern = "rebalance_test_table_$0";
+ constexpr auto kTserverUnresponsiveMs = 3000;
+ const auto timeout = MonoDelta::FromSeconds(30);
+ const vector<string> kMasterFlags = {
+ "--allow_unsafe_replication_factor",
+ Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+ Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
+ };
+ const vector<string> kTserverFlags = {
+ Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+ };
+
+ FLAGS_num_tablet_servers = kNumTservers;
+ FLAGS_num_replicas = kRepFactor;
+ NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+
+ ASSERT_OK(CreateUnbalancedTables(
+ cluster_.get(), client_.get(), schema_, table_name_pattern, kNumTables,
+ kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs));
+
+ // Workloads aren't run for 3-2-3 replica movement with RF = 1 because
+ // the tablet is unavailable during the move until the target voter replica
+ // is up and running. That might take some time, and to avoid flakiness or
+ // setting longer timeouts, RF=1 replicas are moved with no concurrent
+ // workload running.
+ //
+ // TODO(aserbin): clarify why even with 3-4-3 it's a bit flaky now.
+ vector<unique_ptr<TestWorkload>> workloads;
+ //if (kRepFactor > 1 || is_343_scheme) {
+ if (kRepFactor > 1) {
+ for (auto i = 0; i < kNumTables; ++i) {
+ const string table_name = Substitute(table_name_pattern, i);
+ // The workload is light (1 thread, 1 op batches) so that new replicas
+ // bootstrap and converge quickly.
+ unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
+ workload->set_table_name(table_name);
+ workload->set_num_replicas(kRepFactor);
+ workload->set_num_write_threads(1);
+ workload->set_write_batch_size(1);
+ workload->set_write_timeout_millis(timeout.ToMilliseconds());
+ workload->set_already_present_allowed(true);
+ workload->Setup();
+ workload->Start();
+ workloads.emplace_back(std::move(workload));
+ }
+ }
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--move_single_replicas=enabled",
+ };
+
+ {
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ }
+
+ // Next run should report the cluster as balanced and no replica movement
+ // should be attempted.
+ {
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ }
+
+ for (auto& workload : workloads) {
+ workload->StopAndJoin();
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+
+ // Now add a new tablet server into the cluster and make sure the rebalancer
+ // will re-distribute replicas.
+ ASSERT_OK(cluster_->AddTabletServer());
+ {
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ // The cluster was un-balanced, so many replicas should have been moved.
+ ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)");
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// Common base for the rebalancer-related test below.
+class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
+ public:
+ explicit RebalancingTest(int num_tables = 10,
+ int rep_factor = 3,
+ int num_tservers = 8,
+ int tserver_unresponsive_ms = 3000)
+ : num_tables_(num_tables),
+ rep_factor_(rep_factor),
+ num_tservers_(num_tservers),
+ tserver_unresponsive_ms_(tserver_unresponsive_ms) {
+ master_flags_ = {
+ Substitute("--tserver_unresponsive_timeout_ms=$0",
+ tserver_unresponsive_ms_),
+ };
+ }
+
+ virtual bool is_343_scheme() const = 0;
+
+ protected:
+ static const char* const kExitOnSignalStr;
+ static const char* const kTableNamePattern;
+
+ void Prepare(const vector<string>& extra_tserver_flags = {},
+ const vector<string>& extra_master_flags = {}) {
+ const auto& scheme_flag = Substitute(
+ "--raft_prepare_replacement_before_eviction=$0", is_343_scheme());
+ master_flags_.push_back(scheme_flag);
+ tserver_flags_.push_back(scheme_flag);
+
+ copy(extra_tserver_flags.begin(), extra_tserver_flags.end(),
+ back_inserter(tserver_flags_));
+ copy(extra_master_flags.begin(), extra_master_flags.end(),
+ back_inserter(master_flags_));
+
+ FLAGS_num_tablet_servers = num_tservers_;
+ FLAGS_num_replicas = rep_factor_;
+ NO_FATALS(BuildAndStart(tserver_flags_, master_flags_));
+
+ ASSERT_OK(CreateUnbalancedTables(
+ cluster_.get(), client_.get(), schema_, kTableNamePattern,
+ num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
+ tserver_unresponsive_ms_));
+ }
+
+ // When the rebalancer starts moving replicas, ksck detects corruption
+ // (that's why RuntimeError), seeing affected tables as non-healthy
+ // with data state of corresponding tablets as TABLET_DATA_COPYING. If using
+ // this method, it's a good idea to inject some latency into tablet copying
+ // to be able to spot the TABLET_DATA_COPYING state, see the
+ // '--tablet_copy_download_file_inject_latency_ms' flag for tservers.
+ bool IsRebalancingInProgress() {
+ string out;
+ const auto s = RunKuduTool({
+ "cluster",
+ "ksck",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ }, &out);
+ return s.IsRuntimeError() &&
+ out.find("Data state: TABLET_DATA_COPYING") != string::npos;
+ }
+
+ const int num_tables_;
+ const int rep_factor_;
+ const int num_tservers_;
+ const int tserver_unresponsive_ms_;
+ vector<string> tserver_flags_;
+ vector<string> master_flags_;
+};
+const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal";
+const char* const RebalancingTest::kTableNamePattern = "rebalance_test_table_$0";
+
+typedef testing::WithParamInterface<Kudu1097> Kudu1097ParamTest;
+
+// Make sure the rebalancer is able to do its job if running concurrently
+// with DDL activity on the cluster.
+class DDLDuringRebalancingTest : public RebalancingTest,
+ public Kudu1097ParamTest {
+ public:
+ DDLDuringRebalancingTest()
+ : RebalancingTest(/*num_tables=*/ 20) {
+ }
+
+ bool is_343_scheme() const override {
+ return GetParam() == Kudu1097::Enable;
+ }
+};
+INSTANTIATE_TEST_CASE_P(, DDLDuringRebalancingTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(DDLDuringRebalancingTest, TablesCreatedAndDeletedDuringRebalancing) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ NO_FATALS(Prepare());
+
+ // The latch that controls the lifecycle of the concurrent DDL activity.
+ CountDownLatch run_latch(1);
+
+ thread creator([&]() {
+ KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
+ for (auto idx = 0; ; ++idx) {
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
+ break;
+ }
+ const string table_name = Substitute("rebalancer_extra_table_$0", idx++);
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ CHECK_OK(table_creator->table_name(table_name)
+ .schema(&client_schema)
+ .add_hash_partitions({ "key" }, 3)
+ .num_replicas(rep_factor_)
+ .Create());
+ }
+ });
+ auto creator_cleanup = MakeScopedCleanup([&]() {
+ run_latch.CountDown();
+ creator.join();
+ });
+
+ thread deleter([&]() {
+ for (auto idx = 0; idx < num_tables_; ++idx) {
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
+ break;
+ }
+ CHECK_OK(client_->DeleteTable(Substitute(kTableNamePattern, idx++)));
+ }
+ });
+ auto deleter_cleanup = MakeScopedCleanup([&]() {
+ run_latch.CountDown();
+ deleter.join();
+ });
+
+ thread alterer([&]() {
+ const string kTableName = "rebalancer_dynamic_table";
+ const string kNewTableName = "rebalancer_dynamic_table_new_name";
+ while (true) {
+ // Create table.
+ {
+ KuduSchema schema;
+ KuduSchemaBuilder builder;
+ builder.AddColumn("key")->Type(KuduColumnSchema::INT64)->
+ NotNull()->
+ PrimaryKey();
+ builder.AddColumn("a")->Type(KuduColumnSchema::INT64);
+ CHECK_OK(builder.Build(&schema));
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ CHECK_OK(table_creator->table_name(kTableName)
+ .schema(&schema)
+ .set_range_partition_columns({})
+ .num_replicas(rep_factor_)
+ .Create());
+ }
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+
+ // Drop a column.
+ {
+ unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+ alt->DropColumn("a");
+ CHECK_OK(alt->Alter());
+ }
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+
+ // Add back the column with different type.
+ {
+ unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+ alt->AddColumn("a")->Type(KuduColumnSchema::STRING);
+ CHECK_OK(alt->Alter());
+ }
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+
+ // Rename the table.
+ {
+ unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+ alt->RenameTo(kNewTableName);
+ CHECK_OK(alt->Alter());
+ }
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+
+ // Drop the renamed table.
+ CHECK_OK(client_->DeleteTable(kNewTableName));
+ if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
+ break;
+ }
+ }
+ });
+ auto alterer_cleanup = MakeScopedCleanup([&]() {
+ run_latch.CountDown();
+ alterer.join();
+ });
+
+ thread timer([&]() {
+ SleepFor(MonoDelta::FromSeconds(30));
+ run_latch.CountDown();
+ });
+ auto timer_cleanup = MakeScopedCleanup([&]() {
+ timer.join();
+ });
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ };
+
+ // Run the rebalancer concurrently with the DDL operations. The second run
+ // of the rebalancer (the second run starts after joining the timer thread)
+ // is necessary to balance the cluster after the DDL activity stops: that's
+ // the easiest way to make sure the rebalancer will take into account
+ // all DDL changes that happened.
+ //
+ // The signal to terminate the DDL activity (done via run_latch.CountDown())
+ // is sent from a separate timer thread instead of doing SleepFor() after
+ // the first run of the rebalancer followed by run_latch.CountDown().
+ // That's to avoid dependency on the rebalancer behavior if it spots on-going
+ // DDL activity and continues running over and over again.
+ for (auto i = 0; i < 2; ++i) {
+ if (i == 1) {
+ timer.join();
+ timer_cleanup.cancel();
+
+ // Wait for all the DDL activity to complete.
+ alterer.join();
+ alterer_cleanup.cancel();
+
+ deleter.join();
+ deleter_cleanup.cancel();
+
+ creator.join();
+ creator_cleanup.cancel();
+ }
+
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ }
+
+ // Next (3rd) run should report the cluster as balanced and
+ // no replica movement should be attempted.
+ {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// Make sure it's safe to run multiple rebalancers concurrently. The rebalancers
+// might report errors, but they should not get stuck and the cluster should
+// remain in good shape (i.e. no crashes, no data inconsistencies). Re-running a
+// single rebalancer session again should bring the cluster to a balanced state.
+class ConcurrentRebalancersTest : public RebalancingTest,
+ public Kudu1097ParamTest {
+ public:
+ ConcurrentRebalancersTest()
+ : RebalancingTest(/*num_tables=*/ 10) {
+ }
+
+ bool is_343_scheme() const override {
+ return GetParam() == Kudu1097::Enable;
+ }
+};
+INSTANTIATE_TEST_CASE_P(, ConcurrentRebalancersTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(ConcurrentRebalancersTest, TwoConcurrentRebalancers) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ NO_FATALS(Prepare());
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ };
+
+ const auto runner_func = [&]() {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ if (!s.ok()) {
+ // One might expect a bad status returned: e.g., due to some race so
+ // the rebalancer didn't able to make progress for more than
+ // --max_staleness_interval_sec, etc.
+ LOG(INFO) << "rebalancer run info: " << ToolRunInfo(s, out, err);
+ }
+
+ // Should not exit on a signal: not expecting SIGSEGV, SIGABRT, etc.
+ return s.ToString().find(kExitOnSignalStr) == string::npos;
+ };
+
+ const constexpr auto kNumConcurrentRunners = 5;
+ CountDownLatch start_synchronizer(1);
+ vector<thread> concurrent_runners;
+ concurrent_runners.reserve(kNumConcurrentRunners);
+ for (auto i = 0; i < kNumConcurrentRunners; ++i) {
+ concurrent_runners.emplace_back([&]() {
+ start_synchronizer.Wait();
+ CHECK(runner_func());
+ });
+ }
+
+ // Run rebalancers concurrently and wait for their completion.
+ start_synchronizer.CountDown();
+ for (auto& runner : concurrent_runners) {
+ runner.join();
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+
+ {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ }
+
+ // Next run should report the cluster as balanced and no replica movement
+ // should be attempted: at least one run of the rebalancer prior to this
+ // should succeed, so next run is about running the tool against already
+ // balanced cluster.
+ {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// The rebalancer should stop and exit upon detecting a tablet server that
+// went down. That's a simple and effective way of preventing concurrent replica
+// movement by the rebalancer and the automatic re-replication (the catalog
+// manager tries to move replicas from the unreachable tablet server).
+class TserverGoesDownDuringRebalancingTest : public RebalancingTest,
+ public Kudu1097ParamTest {
+ public:
+ TserverGoesDownDuringRebalancingTest() :
+ RebalancingTest(/*num_tables=*/ 5) {
+ }
+
+ bool is_343_scheme() const override {
+ return GetParam() == Kudu1097::Enable;
+ }
+};
+INSTANTIATE_TEST_CASE_P(, TserverGoesDownDuringRebalancingTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(TserverGoesDownDuringRebalancingTest, TserverDown) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ const vector<string> kTserverExtraFlags = {
+ // Slow down tablet copy to make rebalancing step running longer
+ // and become observable via tablet data states output by ksck.
+ "--tablet_copy_download_file_inject_latency_ms=1500",
+
+ "--follower_unavailable_considered_failed_sec=30",
+ };
+ NO_FATALS(Prepare(kTserverExtraFlags));
+
+ // Pre-condition: 'kudu cluster ksck' should be happy with the cluster state
+ // shortly after initial setup.
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_TOOL_OK(
+ "cluster",
+ "ksck",
+ cluster_->master()->bound_rpc_addr().ToString()
+ )
+ });
+
+ Random r(SeedRandom());
+ const uint32_t shutdown_tserver_idx = r.Next() % num_tservers_;
+
+ atomic<bool> run(true);
+ // The thread that shuts down the selected tablet server.
+ thread stopper([&]() {
+ while (run && !IsRebalancingInProgress()) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ // All right, it's time to stop the selected tablet server.
+ cluster_->tablet_server(shutdown_tserver_idx)->Shutdown();
+ });
+ auto stopper_cleanup = MakeScopedCleanup([&]() {
+ run = false;
+ stopper.join();
+ });
+
+ {
+ string out;
+ string err;
+ const auto s = RunKuduTool({
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ // Limiting the number of replicas to move. This is to make the rebalancer
+ // run longer, making sure the rebalancing is in progress when the tablet
+ // server goes down.
+ "--max_moves_per_server=1",
+ }, &out, &err);
+ ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
+
+ // The rebalancer tool should not crash.
+ ASSERT_STR_NOT_CONTAINS(s.ToString(), kExitOnSignalStr);
+ ASSERT_STR_MATCHES(
+ err, "Illegal state: tablet server .* \\(.*\\): "
+ "unacceptable health status UNAVAILABLE");
+ }
+
+ run = false;
+ stopper.join();
+ stopper_cleanup.cancel();
+
+ ASSERT_OK(cluster_->tablet_server(shutdown_tserver_idx)->Restart());
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+// The rebalancer should continue working and complete rebalancing successfully
+// if a new tablet server is added while the cluster is being rebalanced.
+class TserverAddedDuringRebalancingTest : public RebalancingTest,
+ public Kudu1097ParamTest {
+ public:
+ TserverAddedDuringRebalancingTest()
+ : RebalancingTest(/*num_tables=*/ 10) {
+ }
+
+ bool is_343_scheme() const override {
+ return GetParam() == Kudu1097::Enable;
+ }
+};
+INSTANTIATE_TEST_CASE_P(, TserverAddedDuringRebalancingTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ const vector<string> kTserverExtraFlags = {
+ // Slow down tablet copy to make rebalancing step running longer
+ // and become observable via tablet data states output by ksck.
+ "--tablet_copy_download_file_inject_latency_ms=1500",
+
+ "--follower_unavailable_considered_failed_sec=30",
+ };
+ NO_FATALS(Prepare(kTserverExtraFlags));
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ };
+
+ atomic<bool> run(true);
+ thread runner([&]() {
+ while (run) {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ CHECK(s.ok()) << ToolRunInfo(s, out, err);
+ }
+ });
+ auto runner_cleanup = MakeScopedCleanup([&]() {
+ run = false;
+ runner.join();
+ });
+
+ while (!IsRebalancingInProgress()) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ // It's time to sneak in and add new tablet server.
+ ASSERT_OK(cluster_->AddTabletServer());
+ run = false;
+ runner.join();
+ runner_cleanup.cancel();
+
+ // The rebalancer should not fail, and eventually, after a new tablet server
+ // is added, the cluster should become balanced.
+ ASSERT_EVENTUALLY([&]() {
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ });
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
+}
+
+// Run rebalancer in 'election storms' environment and make sure the rebalancer
+// does not exit prematurely or exhibit any other unexpected behavior.
+class RebalancingDuringElectionStormTest : public RebalancingTest,
+ public Kudu1097ParamTest {
+ public:
+ bool is_343_scheme() const override {
+ return GetParam() == Kudu1097::Enable;
+ }
+};
+INSTANTIATE_TEST_CASE_P(, RebalancingDuringElectionStormTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(RebalancingDuringElectionStormTest, RoundRobin) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ NO_FATALS(Prepare());
+
+ atomic<bool> elector_run(true);
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+ // The timeout is a time-to-run for the stormy elector thread as well.
+ // Making longer timeout for workload in case of TSAN/ASAN is not needed:
+ // having everything generated written is not required.
+ const auto timeout = MonoDelta::FromSeconds(5);
+#else
+ const auto timeout = MonoDelta::FromSeconds(10);
+#endif
+ const auto start_time = MonoTime::Now();
+ thread elector([&]() {
+ // Mininum viable divider for modulo ('%') to allow the result to grow by
+ // the rules below.
+ auto max_sleep_ms = 2.0;
+ while (elector_run && MonoTime::Now() < start_time + timeout) {
+ for (const auto& e : tablet_servers_) {
+ const auto& ts_uuid = e.first;
+ const auto* ts = e.second;
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ auto const s = itest::ListTablets(ts, timeout, &tablets);
+ if (!s.ok()) {
+ LOG(WARNING) << ts_uuid << ": failed to get tablet list :"
+ << s.ToString();
+ continue;
+ }
+ consensus::ConsensusServiceProxy proxy(
+ cluster_->messenger(),
+ cluster_->tablet_server_by_uuid(ts_uuid)->bound_rpc_addr(),
+ "tserver " + ts_uuid);
+ for (const auto& tablet : tablets) {
+ const auto& tablet_id = tablet.tablet_status().tablet_id();
+ consensus::RunLeaderElectionRequestPB req;
+ req.set_tablet_id(tablet_id);
+ req.set_dest_uuid(ts_uuid);
+ rpc::RpcController rpc;
+ rpc.set_timeout(timeout);
+ consensus::RunLeaderElectionResponsePB resp;
+ WARN_NOT_OK(proxy.RunLeaderElection(req, &resp, &rpc),
+ Substitute("failed to start election for tablet $0",
+ tablet_id));
+ }
+ if (!elector_run || start_time + timeout <= MonoTime::Now()) {
+ break;
+ }
+ auto sleep_ms = rand() % static_cast<int>(max_sleep_ms);
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ max_sleep_ms = std::min(max_sleep_ms * 1.1, 2000.0);
+ }
+ }
+ });
+ auto elector_cleanup = MakeScopedCleanup([&]() {
+ elector_run = false;
+ elector.join();
+ });
+
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+ vector<unique_ptr<TestWorkload>> workloads;
+ for (auto i = 0; i < num_tables_; ++i) {
+ const string table_name = Substitute(kTableNamePattern, i);
+ // The workload is light (1 thread, 1 op batches) and lenient to failures.
+ unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
+ workload->set_table_name(table_name);
+ workload->set_num_replicas(rep_factor_);
+ workload->set_num_write_threads(1);
+ workload->set_write_batch_size(1);
+ workload->set_write_timeout_millis(timeout.ToMilliseconds());
+ workload->set_already_present_allowed(true);
+ workload->set_remote_error_allowed(true);
+ workload->set_timeout_allowed(true);
+ workload->Setup();
+ workload->Start();
+ workloads.emplace_back(std::move(workload));
+ }
+#endif
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ };
+
+ while (MonoTime::Now() < start_time + timeout) {
+ // Rebalancer should not report any errors even if it's an election storm
+ // unless a tablet server is reported as unavailable by ksck: the latter
+ // usually happens because GetConsensusState requests are dropped due to
+ // backpressure.
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ if (s.ok()) {
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << ToolRunInfo(s, out, err);
+ } else {
+ ASSERT_STR_CONTAINS(err, "unacceptable health status UNAVAILABLE")
+ << ToolRunInfo(s, out, err);
+ }
+ }
+
+ elector_run = false;
+ elector.join();
+ elector_cleanup.cancel();
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+ for (auto& workload : workloads) {
+ workload->StopAndJoin();
+ }
+#endif
+
+ // There might be some re-replication started as a result of election storm,
+ // etc. Eventually, the system should heal itself and 'kudu cluster ksck'
+ // should report no issues.
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_TOOL_OK(
+ "cluster",
+ "ksck",
+ cluster_->master()->bound_rpc_addr().ToString()
+ )
+ });
+
+ // The rebalancer should successfully rebalance the cluster after ksck
+ // reported 'all is well'.
+ {
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << ToolRunInfo(s, out, err);
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+// A test to verify how the rebalancer handles replicas of single-replica
+// tablets in case of various values of the '--move_single_replicas' flag
+// and replica management schemes.
+class RebalancerAndSingleReplicaTablets :
+ public AdminCliTest,
+ public ::testing::WithParamInterface<tuple<string, Kudu1097>> {
+};
+INSTANTIATE_TEST_CASE_P(, RebalancerAndSingleReplicaTablets,
+ ::testing::Combine(::testing::Values("auto", "enabled", "disabled"),
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)));
+TEST_P(RebalancerAndSingleReplicaTablets, SingleReplicasStayOrMove) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ constexpr auto kRepFactor = 1;
+ constexpr auto kNumTservers = 2 * (kRepFactor + 1);
+ constexpr auto kNumTables = kNumTservers;
+ constexpr auto kTserverUnresponsiveMs = 3000;
+ const auto& param = GetParam();
+ const auto& move_single_replica = std::get<0>(param);
+ const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable);
+ const string table_name_pattern = "rebalance_test_table_$0";
+ const vector<string> master_flags = {
+ Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+ Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
+ };
+ const vector<string> tserver_flags = {
+ Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+ };
+
+ FLAGS_num_tablet_servers = kNumTservers;
+ FLAGS_num_replicas = kRepFactor;
+ NO_FATALS(BuildAndStart(tserver_flags, master_flags));
+
+ // Keep running only (kRepFactor + 1) tablet servers and shut down the rest.
+ for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
+ cluster_->tablet_server(i)->Shutdown();
+ }
+
+ // Wait for the catalog manager to understand that only (kRepFactor + 1)
+ // tablet servers are available.
+ SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4));
+
+ // Create few tables with their tablet replicas landing only on those
+ // (kRepFactor + 1) running tablet servers.
+ KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
+ for (auto i = 0; i < kNumTables; ++i) {
+ const string table_name = Substitute(table_name_pattern, i);
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ ASSERT_OK(table_creator->table_name(table_name)
+ .schema(&client_schema)
+ .add_hash_partitions({ "key" }, 3)
+ .num_replicas(kRepFactor)
+ .Create());
+ ASSERT_TOOL_OK(
+ "perf",
+ "loadgen",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ Substitute("--table_name=$0", table_name),
+ // Don't need much data in there.
+ "--num_threads=1",
+ "--num_rows_per_thread=1",
+ );
+ }
+ for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
+ ASSERT_OK(cluster_->tablet_server(i)->Restart());
+ }
+
+ string out;
+ string err;
+ const Status s = RunKuduTool({
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ Substitute("--move_single_replicas=$0", move_single_replica),
+ }, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ if (move_single_replica == "enabled" ||
+ (move_single_replica == "auto" && is_343_scheme)) {
+ // Should move appropriate replicas of single-replica tablets.
+ ASSERT_STR_NOT_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ ASSERT_STR_NOT_CONTAINS(err, "has single replica, skipping");
+ } else {
+ ASSERT_STR_CONTAINS(out,
+ "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+ << "stderr: " << err;
+ ASSERT_STR_MATCHES(err, "tablet .* of table '.*' (.*) has single replica, skipping");
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+}
+
+} // namespace tools
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/66b0a4cc/src/kudu/tools/tool_test_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_test_util.h b/src/kudu/tools/tool_test_util.h
index 6925468..1d7eae7 100644
--- a/src/kudu/tools/tool_test_util.h
+++ b/src/kudu/tools/tool_test_util.h
@@ -27,6 +27,11 @@
namespace kudu {
namespace tools {
+enum class Kudu1097 {
+ Disable,
+ Enable,
+};
+
// Get full path to the top-level 'kudu' tool binary.
std::string GetKuduToolAbsolutePath();