You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/08/02 23:20:40 UTC
kudu git commit: [tools] run rebalancer during 'election storm'
Repository: kudu
Updated Branches:
refs/heads/master a7a850278 -> 703db4141
[tools] run rebalancer during 'election storm'
Added an integration test to run the rebalancer tool in an 'election
storm' environment. The rebalancer should run with no issues
unless a tablet server is reported as unavailable.
I ran the test 250 times in each build configuration and didn't spot any
failures.
Change-Id: Ic98684dbe55049bbc411513faa0b6bbaef20f434
Reviewed-on: http://gerrit.cloudera.org:8080/11107
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/703db414
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/703db414
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/703db414
Branch: refs/heads/master
Commit: 703db4141ca912193439636fb0931510f9b27de0
Parents: a7a8502
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Jul 31 23:35:59 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu Aug 2 23:12:55 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/kudu-admin-test.cc | 155 +++++++++++++++++++++++++++++++++
1 file changed, 155 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/703db414/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 04c5af9..05ba159 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -19,6 +19,7 @@
#include <atomic>
#include <cstdint>
#include <cstdio>
+#include <cstdlib>
#include <deque>
#include <iterator>
#include <memory>
@@ -41,6 +42,7 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/quorum_util.h"
@@ -54,9 +56,12 @@
#include "kudu/integration-tests/ts_itest-base.h"
#include "kudu/master/master.pb.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/tool_test_util.h"
#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
@@ -100,6 +105,7 @@ using kudu::itest::WaitUntilTabletInState;
using kudu::itest::WaitUntilTabletRunning;
using kudu::master::VOTER_REPLICA;
using kudu::pb_util::SecureDebugString;
+using kudu::tserver::ListTabletsResponsePB;
using std::atomic;
using std::back_inserter;
using std::copy;
@@ -2096,6 +2102,155 @@ TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) {
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
}
+// Run rebalancer in 'election storms' environment and make sure the rebalancer
+// does not exit prematurely or exhibit any other unexpected behavior.
+class RebalancingDuringElectionStormTest : public RebalancingTest {
+};
+INSTANTIATE_TEST_CASE_P(, RebalancingDuringElectionStormTest,
+ ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(RebalancingDuringElectionStormTest, RoundRobin) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ NO_FATALS(Prepare());
+
+ atomic<bool> elector_run(true);
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+ // The timeout is a time-to-run for the stormy elector thread as well.
+ // Making longer timeout for workload in case of TSAN/ASAN is not needed:
+ // having everything generated written is not required.
+ const auto timeout = MonoDelta::FromSeconds(5);
+#else
+ const auto timeout = MonoDelta::FromSeconds(10);
+#endif
+ const auto start_time = MonoTime::Now();
+ thread elector([&]() {
+ // Mininum viable divider for modulo ('%') to allow the result to grow by
+ // the rules below.
+ auto max_sleep_ms = 2.0;
+ while (elector_run && MonoTime::Now() < start_time + timeout) {
+ for (const auto& e : tablet_servers_) {
+ const auto& ts_uuid = e.first;
+ const auto* ts = e.second;
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ auto const s = itest::ListTablets(ts, timeout, &tablets);
+ if (!s.ok()) {
+ LOG(WARNING) << ts_uuid << ": failed to get tablet list :"
+ << s.ToString();
+ continue;
+ }
+ consensus::ConsensusServiceProxy proxy(
+ cluster_->messenger(),
+ cluster_->tablet_server_by_uuid(ts_uuid)->bound_rpc_addr(),
+ "tserver " + ts_uuid);
+ for (const auto& tablet : tablets) {
+ const auto& tablet_id = tablet.tablet_status().tablet_id();
+ consensus::RunLeaderElectionRequestPB req;
+ req.set_tablet_id(tablet_id);
+ req.set_dest_uuid(ts_uuid);
+ rpc::RpcController rpc;
+ rpc.set_timeout(timeout);
+ consensus::RunLeaderElectionResponsePB resp;
+ WARN_NOT_OK(proxy.RunLeaderElection(req, &resp, &rpc),
+ Substitute("failed to start election for tablet $0",
+ tablet_id));
+ }
+ if (!elector_run || start_time + timeout <= MonoTime::Now()) {
+ break;
+ }
+ auto sleep_ms = rand() % static_cast<int>(max_sleep_ms);
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ max_sleep_ms = std::min(max_sleep_ms * 1.1, 2000.0);
+ }
+ }
+ });
+ auto elector_cleanup = MakeScopedCleanup([&]() {
+ elector_run = false;
+ elector.join();
+ });
+
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+ vector<unique_ptr<TestWorkload>> workloads;
+ for (auto i = 0; i < num_tables_; ++i) {
+ const string table_name = Substitute(table_name_pattern_, i);
+ // The workload is light (1 thread, 1 op batches) and lenient to failures.
+ unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
+ workload->set_table_name(table_name);
+ workload->set_num_replicas(rep_factor_);
+ workload->set_num_write_threads(1);
+ workload->set_write_batch_size(1);
+ workload->set_write_timeout_millis(timeout.ToMilliseconds());
+ workload->set_already_present_allowed(true);
+ workload->set_remote_error_allowed(true);
+ workload->set_timeout_allowed(true);
+ workload->Setup();
+ workload->Start();
+ workloads.emplace_back(std::move(workload));
+ }
+#endif
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ };
+
+ while (MonoTime::Now() < start_time + timeout) {
+ // Rebalancer should not report any errors even if it's an election storm
+ // unless a tablet server is reported as unavailable by ksck: the latter
+ // usually happens because GetConsensusState requests are dropped due to
+ // backpressure.
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ if (s.ok()) {
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << ToolRunInfo(s, out, err);
+ } else {
+ ASSERT_STR_CONTAINS(err, "unacceptable health status UNAVAILABLE")
+ << ToolRunInfo(s, out, err);
+ }
+ }
+
+ elector_run = false;
+ elector.join();
+ elector_cleanup.cancel();
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+ for (auto& workload : workloads) {
+ workload->StopAndJoin();
+ }
+#endif
+
+ // There might be some re-replication started as a result of election storm,
+ // etc. Eventually, the system should heal itself and 'kudu cluster ksck'
+ // should report no issues.
+ ASSERT_EVENTUALLY([&]() {
+ string out;
+ string err;
+ const auto s = RunKuduTool({
+ "cluster",
+ "ksck",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ }, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ });
+
+ // The rebalancer should successfully rebalance the cluster after ksck
+ // reported 'all is well'.
+ {
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << ToolRunInfo(s, out, err);
+ }
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
+
// A test to verify how the rebalancer handles replicas of single-replica
// tablets in case of various values of the '--move_single_replicas' flag
// and replica management schemes.