You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/08/28 00:41:15 UTC
[kudu] branch master updated: Create kudu/rebalance subdirectory
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new e2567b6 Create kudu/rebalance subdirectory
e2567b6 is described below
commit e2567b69ed3947f7b75a7d3bf2a7b65b3885bcec
Author: hannahvnguyen <ha...@cloudera.com>
AuthorDate: Wed Aug 21 14:00:45 2019 -0700
Create kudu/rebalance subdirectory
Moved and renamed structs used for rebalancing from
tools/ksck_results into rebalance/cluster_status.
Moved tools/rebalance_algo to rebalance/rebalance_algo.
Moved class Rebalancer from tools/rebalancer to rebalance/rebalancer.
Renamed tools/rebalancer to tools/rebalancer_tool.
Created RebalancerTool in tools/rebalancer_tool that inherits
from Rebalancer.
Moved corresponding tests into kudu/rebalance subdirectory.
Movement of code done to prepare for autorebalancer task in master.
No functional changes made.
Change-Id: Ie0242a019cb44517539da2878cf889ee0c511964
Reviewed-on: http://gerrit.cloudera.org:8080/14110
Tested-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
CMakeLists.txt | 1 +
src/kudu/rebalance/CMakeLists.txt | 45 ++
src/kudu/rebalance/cluster_status.cc | 74 +++
src/kudu/rebalance/cluster_status.h | 234 +++++++++
.../placement_policy_util-test.cc | 10 +-
.../{tools => rebalance}/placement_policy_util.cc | 17 +-
.../{tools => rebalance}/placement_policy_util.h | 10 +-
src/kudu/{tools => rebalance}/rebalance-test.cc | 45 +-
.../{tools => rebalance}/rebalance_algo-test.cc | 13 +-
src/kudu/{tools => rebalance}/rebalance_algo.cc | 7 +-
src/kudu/{tools => rebalance}/rebalance_algo.h | 4 +-
src/kudu/rebalance/rebalancer.cc | 440 +++++++++++++++++
src/kudu/rebalance/rebalancer.h | 233 +++++++++
src/kudu/tools/CMakeLists.txt | 9 +-
src/kudu/tools/ksck-test.cc | 264 +++++-----
src/kudu/tools/ksck.cc | 118 ++---
src/kudu/tools/ksck.h | 10 +-
src/kudu/tools/ksck_remote.cc | 17 +-
src/kudu/tools/ksck_remote.h | 10 +-
src/kudu/tools/ksck_results.cc | 306 ++++++------
src/kudu/tools/ksck_results.h | 235 +--------
.../tools/{rebalancer.cc => rebalancer_tool.cc} | 539 ++++-----------------
src/kudu/tools/{rebalancer.h => rebalancer_tool.h} | 253 ++--------
src/kudu/tools/tool.proto | 26 +-
src/kudu/tools/tool_action_cluster.cc | 23 +-
25 files changed, 1617 insertions(+), 1326 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d53f81e..324e6fc 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1487,6 +1487,7 @@ add_subdirectory(src/kudu/integration-tests)
add_subdirectory(src/kudu/kserver)
add_subdirectory(src/kudu/master)
add_subdirectory(src/kudu/mini-cluster)
+add_subdirectory(src/kudu/rebalance)
add_subdirectory(src/kudu/rpc)
add_subdirectory(src/kudu/security)
add_subdirectory(src/kudu/sentry)
diff --git a/src/kudu/rebalance/CMakeLists.txt b/src/kudu/rebalance/CMakeLists.txt
new file mode 100644
index 0000000..a7c6522
--- /dev/null
+++ b/src/kudu/rebalance/CMakeLists.txt
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#######################################
+# rebalance
+#######################################
+
+add_library(rebalance
+ cluster_status.cc
+ placement_policy_util.cc
+ rebalance_algo.cc
+ rebalancer.cc
+)
+
+target_link_libraries(rebalance
+ consensus
+ gutil
+ kudu_util
+)
+
+#######################################
+# Unit tests
+#######################################
+
+SET_KUDU_TEST_LINK_LIBS(
+ rebalance
+)
+
+ADD_KUDU_TEST(placement_policy_util-test)
+ADD_KUDU_TEST(rebalance-test)
+ADD_KUDU_TEST(rebalance_algo-test)
diff --git a/src/kudu/rebalance/cluster_status.cc b/src/kudu/rebalance/cluster_status.cc
new file mode 100644
index 0000000..685b059
--- /dev/null
+++ b/src/kudu/rebalance/cluster_status.cc
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/rebalance/cluster_status.h"
+
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/tablet/tablet.pb.h" // IWYU pragma: keep
+
+namespace kudu {
+namespace cluster_summary {
+
+const char* const HealthCheckResultToString(HealthCheckResult cr) {
+ switch (cr) {
+ case HealthCheckResult::HEALTHY:
+ return "HEALTHY";
+ case HealthCheckResult::RECOVERING:
+ return "RECOVERING";
+ case HealthCheckResult::UNDER_REPLICATED:
+ return "UNDER_REPLICATED";
+ case HealthCheckResult::CONSENSUS_MISMATCH:
+ return "CONSENSUS_MISMATCH";
+ case HealthCheckResult::UNAVAILABLE:
+ return "UNAVAILABLE";
+ default:
+ LOG(FATAL) << "Unknown CheckResult";
+ }
+}
+
+// Return a string representation of 'sh'.
+const char* const ServerHealthToString(ServerHealth sh) {
+ switch (sh) {
+ case ServerHealth::HEALTHY:
+ return "HEALTHY";
+ case ServerHealth::UNAUTHORIZED:
+ return "UNAUTHORIZED";
+ case ServerHealth::UNAVAILABLE:
+ return "UNAVAILABLE";
+ case ServerHealth::WRONG_SERVER_UUID:
+ return "WRONG_SERVER_UUID";
+ default:
+ LOG(FATAL) << "Unknown ServerHealth";
+ }
+}
+
+// Return a string representation of 'type'.
+const char* const ServerTypeToString(ServerType type) {
+ switch (type) {
+ case ServerType::MASTER:
+ return "Master";
+ case ServerType::TABLET_SERVER:
+ return "Tablet Server";
+ default:
+ LOG(FATAL) << "Unknown ServerType";
+ }
+}
+
+} // namespace cluster_summary
+} // namespace kudu
diff --git a/src/kudu/rebalance/cluster_status.h b/src/kudu/rebalance/cluster_status.h
new file mode 100644
index 0000000..6ca403d
--- /dev/null
+++ b/src/kudu/rebalance/cluster_status.h
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet.pb.h" // IWYU pragma: keep
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace cluster_summary {
+
+// The result of health check on a tablet.
+// Also used to indicate the health of a table, since the health of a table is
+// the health of its least healthy tablet.
+enum class HealthCheckResult {
+ // The tablet is healthy.
+ HEALTHY,
+
+ // The tablet has on-going tablet copies.
+ RECOVERING,
+
+ // The tablet has fewer replicas than its table's replication factor and
+ // has no on-going tablet copies.
+ UNDER_REPLICATED,
+
+ // The tablet is missing a majority of its replicas and is unavailable for
+ // writes. If a majority cannot be brought back online, then the tablet
+ // requires manual intervention to recover.
+ UNAVAILABLE,
+
+ // There was a discrepancy among the tablets' consensus configs and the master's.
+ CONSENSUS_MISMATCH,
+};
+
+const char* const HealthCheckResultToString(HealthCheckResult cr);
+
+// Possible types of consensus configs.
+enum class ConsensusConfigType {
+ // A config reported by the master.
+ MASTER,
+ // A config that has been committed.
+ COMMITTED,
+ // A config that has not yet been committed.
+ PENDING,
+};
+
+// Representation of a consensus state.
+struct ConsensusState {
+ ConsensusState() = default;
+ ConsensusState(ConsensusConfigType type,
+ boost::optional<int64_t> term,
+ boost::optional<int64_t> opid_index,
+ boost::optional<std::string> leader_uuid,
+ const std::vector<std::string>& voters,
+ const std::vector<std::string>& non_voters)
+ : type(type),
+ term(std::move(term)),
+ opid_index(std::move(opid_index)),
+ leader_uuid(std::move(leader_uuid)),
+ voter_uuids(voters.cbegin(), voters.cend()),
+ non_voter_uuids(non_voters.cbegin(), non_voters.cend()) {
+ // A consensus state must have a term unless it's one sourced from the master.
+ CHECK(type == ConsensusConfigType::MASTER || term);
+ }
+
+ // Two ConsensusState structs match if they have the same
+ // leader_uuid, the same set of peers, and one of the following holds:
+ // - at least one of them is of type MASTER
+ // - they are configs of the same type and they have the same term
+ bool Matches(const ConsensusState &other) const {
+ bool same_leader_and_peers =
+ leader_uuid == other.leader_uuid &&
+ voter_uuids == other.voter_uuids &&
+ non_voter_uuids == other.non_voter_uuids;
+ if (type == ConsensusConfigType::MASTER ||
+ other.type == ConsensusConfigType::MASTER) {
+ return same_leader_and_peers;
+ }
+ return type == other.type && term == other.term && same_leader_and_peers;
+ }
+
+ ConsensusConfigType type;
+ boost::optional<int64_t> term;
+ boost::optional<int64_t> opid_index;
+ boost::optional<std::string> leader_uuid;
+ std::set<std::string> voter_uuids;
+ std::set<std::string> non_voter_uuids;
+};
+
+// Represents the health of a server.
+enum class ServerHealth {
+ // The server is healthy.
+ HEALTHY,
+
+ // The server rejected attempts to communicate as unauthorized.
+ UNAUTHORIZED,
+
+ // The server can't be contacted.
+ UNAVAILABLE,
+
+ // The server reported an unexpected UUID.
+ WRONG_SERVER_UUID,
+};
+
+// Return a string representation of 'sh'.
+const char* const ServerHealthToString(ServerHealth sh);
+
+// A summary of a server health check.
+struct ServerHealthSummary {
+ std::string uuid;
+ std::string address;
+ std::string ts_location;
+ boost::optional<std::string> version;
+ ServerHealth health = ServerHealth::HEALTHY;
+ Status status = Status::OK();
+};
+
+// A summary of the state of a table.
+struct TableSummary {
+ std::string id;
+ std::string name;
+ int replication_factor = 0;
+ int healthy_tablets = 0;
+ int recovering_tablets = 0;
+ int underreplicated_tablets = 0;
+ int consensus_mismatch_tablets = 0;
+ int unavailable_tablets = 0;
+
+ int TotalTablets() const {
+ return healthy_tablets + recovering_tablets + underreplicated_tablets +
+ consensus_mismatch_tablets + unavailable_tablets;
+ }
+
+ int UnhealthyTablets() const {
+ return TotalTablets() - healthy_tablets;
+ }
+
+ // Summarize the table's status with a HealthCheckResult.
+ // A table's status is determined by the health of the least healthy tablet.
+ HealthCheckResult TableStatus() const {
+ if (unavailable_tablets > 0) {
+ return HealthCheckResult::UNAVAILABLE;
+ }
+ if (consensus_mismatch_tablets > 0) {
+ return HealthCheckResult::CONSENSUS_MISMATCH;
+ }
+ if (underreplicated_tablets > 0) {
+ return HealthCheckResult::UNDER_REPLICATED;
+ }
+ if (recovering_tablets > 0) {
+ return HealthCheckResult::RECOVERING;
+ }
+ return HealthCheckResult::HEALTHY;
+ }
+};
+
+// Types of Kudu servers.
+enum class ServerType {
+ MASTER,
+ TABLET_SERVER,
+};
+
+// Return a string representation of 'type'.
+const char* const ServerTypeToString(ServerType type);
+
+// A summary of the state of a tablet replica.
+struct ReplicaSummary {
+ std::string ts_uuid;
+ boost::optional<std::string> ts_address;
+ bool ts_healthy = false;
+ bool is_leader = false;
+ bool is_voter = false;
+ tablet::TabletStatePB state = tablet::UNKNOWN;
+ boost::optional<tablet::TabletStatusPB> status_pb;
+ boost::optional<ConsensusState> consensus_state;
+};
+
+// A summary of the state of a tablet.
+struct TabletSummary {
+ std::string id;
+ std::string table_id;
+ std::string table_name;
+ HealthCheckResult result;
+ std::string status;
+ ConsensusState master_cstate;
+ std::vector<ReplicaSummary> replicas;
+};
+
+typedef std::map<std::string, ConsensusState> ConsensusStateMap;
+
+// Container for information of cluster status.
+struct ClusterStatus {
+
+ // Health summaries for master and tablet servers.
+ std::vector<ServerHealthSummary> master_summaries;
+ std::vector<ServerHealthSummary> tserver_summaries;
+
+ // Information about the master consensus configuration.
+ std::vector<std::string> master_uuids;
+ bool master_consensus_conflict = false;
+ ConsensusStateMap master_consensus_state_map;
+
+ // Detailed information about each table and tablet.
+ // Tablet information includes consensus state.
+ std::vector<TabletSummary> tablet_summaries;
+ std::vector<TableSummary> table_summaries;
+};
+
+} // namespace cluster_summary
+} // namespace kudu
diff --git a/src/kudu/tools/placement_policy_util-test.cc b/src/kudu/rebalance/placement_policy_util-test.cc
similarity index 99%
rename from src/kudu/tools/placement_policy_util-test.cc
rename to src/kudu/rebalance/placement_policy_util-test.cc
index 6eca2fe..60dd927 100644
--- a/src/kudu/tools/placement_policy_util-test.cc
+++ b/src/kudu/rebalance/placement_policy_util-test.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/tools/placement_policy_util.h"
+#include "kudu/rebalance/placement_policy_util.h"
#include <cstdint>
#include <iostream>
@@ -32,8 +32,8 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tools/rebalance_algo.h"
-#include "kudu/tools/rebalancer.h"
+#include "kudu/rebalance/rebalance_algo.h"
+#include "kudu/rebalance/rebalancer.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -47,7 +47,7 @@ using std::vector;
using strings::Substitute;
namespace kudu {
-namespace tools {
+namespace rebalance {
// TODO(aserbin): consider renaming the structures below XxxInfo --> Xxx
@@ -729,5 +729,5 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsNoneEvenRF) {
NO_FATALS(RunTest(configs));
}
-} // namespace tools
+} // namespace rebalance
} // namespace kudu
diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/rebalance/placement_policy_util.cc
similarity index 97%
rename from src/kudu/tools/placement_policy_util.cc
rename to src/kudu/rebalance/placement_policy_util.cc
index b92ff3b..2036f65 100644
--- a/src/kudu/tools/placement_policy_util.cc
+++ b/src/kudu/rebalance/placement_policy_util.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/tools/placement_policy_util.h"
+#include "kudu/rebalance/placement_policy_util.h"
#include <cstdint>
#include <functional>
@@ -33,8 +33,9 @@
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tools/ksck_results.h"
-#include "kudu/tools/rebalance_algo.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/rebalance/rebalance_algo.h"
+#include "kudu/rebalance/rebalancer.h"
#include "kudu/util/status.h"
using std::multimap;
@@ -44,7 +45,7 @@ using std::vector;
using strings::Substitute;
namespace kudu {
-namespace tools {
+namespace rebalance {
namespace {
@@ -239,12 +240,12 @@ Status BuildTabletsPlacementInfo(
for (const auto& tablet_summary : raw_info.tablet_summaries) {
const auto& tablet_id = tablet_summary.id;
// TODO(aserbin): process RF=1 tablets as necessary
- if (tablet_summary.result != KsckCheckResult::HEALTHY &&
- tablet_summary.result != KsckCheckResult::RECOVERING) {
+ if (tablet_summary.result != cluster_summary::HealthCheckResult::HEALTHY &&
+ tablet_summary.result != cluster_summary::HealthCheckResult::RECOVERING) {
VLOG(1) << Substitute("tablet $0: not considering replicas for movement "
"since the tablet's status is '$1'",
tablet_id,
- KsckCheckResultToString(tablet_summary.result));
+ cluster_summary::HealthCheckResultToString(tablet_summary.result));
continue;
}
EmplaceOrDie(&tablet_to_table_id, tablet_id, tablet_summary.table_id);
@@ -454,5 +455,5 @@ Status FindMovesToReimposePlacementPolicy(
return Status::OK();
}
-} // namespace tools
+} // namespace rebalance
} // namespace kudu
diff --git a/src/kudu/tools/placement_policy_util.h b/src/kudu/rebalance/placement_policy_util.h
similarity index 98%
rename from src/kudu/tools/placement_policy_util.h
rename to src/kudu/rebalance/placement_policy_util.h
index 7547b8e..ef02483 100644
--- a/src/kudu/tools/placement_policy_util.h
+++ b/src/kudu/rebalance/placement_policy_util.h
@@ -23,17 +23,17 @@
#include <boost/optional/optional.hpp>
-#include "kudu/tools/rebalancer.h"
+#include "kudu/rebalance/rebalancer.h"
#include "kudu/util/status.h"
namespace kudu {
-namespace tools {
+namespace rebalance {
+
+struct ClusterLocalityInfo;
// Below are the structures to describe Kudu entities such as tablet replicas,
// tablets, and tables from the perspective of the placement policy constraints.
-struct ClusterLocalityInfo;
-
enum class ReplicaRole {
LEADER,
FOLLOWER_VOTER,
@@ -118,5 +118,5 @@ Status FindMovesToReimposePlacementPolicy(
const std::vector<PlacementPolicyViolationInfo>& violations_info,
std::vector<Rebalancer::ReplicaMove>* replicas_to_remove);
-} // namespace tools
+} // namespace rebalance
} // namespace kudu
diff --git a/src/kudu/tools/rebalance-test.cc b/src/kudu/rebalance/rebalance-test.cc
similarity index 92%
rename from src/kudu/tools/rebalance-test.cc
rename to src/kudu/rebalance/rebalance-test.cc
index 807dc82..a46c454 100644
--- a/src/kudu/tools/rebalance-test.cc
+++ b/src/kudu/rebalance/rebalance-test.cc
@@ -27,11 +27,16 @@
#include <gtest/gtest.h>
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tools/ksck_results.h"
-#include "kudu/tools/rebalance_algo.h"
-#include "kudu/tools/rebalancer.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/rebalance/rebalance_algo.h"
+#include "kudu/rebalance/rebalancer.h"
#include "kudu/util/test_macros.h"
+using kudu::cluster_summary::ReplicaSummary;
+using kudu::cluster_summary::ServerHealthSummary;
+using kudu::cluster_summary::TableSummary;
+using kudu::cluster_summary::TabletSummary;
+
using std::inserter;
using std::ostream;
using std::sort;
@@ -41,26 +46,26 @@ using std::vector;
using strings::Substitute;
namespace kudu {
-namespace tools {
+namespace rebalance {
namespace {
-struct KsckReplicaSummaryInput {
+struct ReplicaSummaryInput {
std::string ts_uuid;
bool is_voter;
};
-struct KsckServerHealthSummaryInput {
+struct ServerHealthSummaryInput {
std::string uuid;
};
-struct KsckTabletSummaryInput {
+struct TabletSummaryInput {
std::string id;
std::string table_id;
- std::vector<KsckReplicaSummaryInput> replicas;
+ std::vector<ReplicaSummaryInput> replicas;
};
-struct KsckTableSummaryInput {
+struct TableSummaryInput {
std::string id;
int replication_factor;
};
@@ -68,9 +73,9 @@ struct KsckTableSummaryInput {
// The input to build KsckResults data. Contains relevant sub-fields of the
// KsckResults to use in the test.
struct KsckResultsInput {
- vector<KsckServerHealthSummaryInput> tserver_summaries;
- vector<KsckTabletSummaryInput> tablet_summaries;
- vector<KsckTableSummaryInput> table_summaries;
+ vector<ServerHealthSummaryInput> tserver_summaries;
+ vector<TabletSummaryInput> tablet_summaries;
+ vector<TableSummaryInput> table_summaries;
};
// The configuration for the test.
@@ -85,22 +90,22 @@ struct KsckResultsTestConfig {
ClusterRawInfo GenerateRawClusterInfo(const KsckResultsInput& input) {
ClusterRawInfo raw_info;
{
- vector<KsckServerHealthSummary>& summaries = raw_info.tserver_summaries;
+ vector<ServerHealthSummary>& summaries = raw_info.tserver_summaries;
for (const auto& summary_input : input.tserver_summaries) {
- KsckServerHealthSummary summary;
+ ServerHealthSummary summary;
summary.uuid = summary_input.uuid;
summaries.emplace_back(std::move(summary));
}
}
{
- vector<KsckTabletSummary>& summaries = raw_info.tablet_summaries;
+ vector<TabletSummary>& summaries = raw_info.tablet_summaries;
for (const auto& summary_input : input.tablet_summaries) {
- KsckTabletSummary summary;
+ TabletSummary summary;
summary.id = summary_input.id;
summary.table_id = summary_input.table_id;
auto& replicas = summary.replicas;
for (const auto& replica_input : summary_input.replicas) {
- KsckReplicaSummary replica;
+ ReplicaSummary replica;
replica.ts_uuid = replica_input.ts_uuid;
replica.is_voter = replica_input.is_voter;
replicas.emplace_back(std::move(replica));
@@ -109,9 +114,9 @@ ClusterRawInfo GenerateRawClusterInfo(const KsckResultsInput& input) {
}
}
{
- vector<KsckTableSummary>& summaries = raw_info.table_summaries;
+ vector<TableSummary>& summaries = raw_info.table_summaries;
for (const auto& summary_input : input.table_summaries) {
- KsckTableSummary summary;
+ TableSummary summary;
summary.id = summary_input.id;
summary.replication_factor = summary_input.replication_factor;
summaries.emplace_back(std::move(summary));
@@ -449,5 +454,5 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
NO_FATALS(RunTest(rebalancer_config, test_configs));
}
-} // namespace tools
+} // namespace rebalance
} // namespace kudu
diff --git a/src/kudu/tools/rebalance_algo-test.cc b/src/kudu/rebalance/rebalance_algo-test.cc
similarity index 99%
rename from src/kudu/tools/rebalance_algo-test.cc
rename to src/kudu/rebalance/rebalance_algo-test.cc
index 2f7ddf3..8cfd3b2 100644
--- a/src/kudu/tools/rebalance_algo-test.cc
+++ b/src/kudu/rebalance/rebalance_algo-test.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/tools/rebalance_algo.h"
+#include "kudu/rebalance/rebalance_algo.h"
#include <algorithm>
#include <cstddef>
@@ -42,11 +42,12 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+// IWYU pragma: keep.
namespace kudu {
-namespace tools {
+namespace rebalance {
struct TestClusterConfig;
-} // namespace tools
-} // namespace kudu
+} // namespace rebalance
+} // namespace kudu
#define VERIFY_MOVES(test_config) \
do { \
@@ -75,7 +76,7 @@ using std::vector;
using strings::Substitute;
namespace kudu {
-namespace tools {
+namespace rebalance {
struct TablePerServerReplicas {
const string table_id;
@@ -1114,5 +1115,5 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleMT) {
VERIFY_LOCATION_BALANCING_MOVES(kConfigs);
}
-} // namespace tools
+} // namespace rebalance
} // namespace kudu
diff --git a/src/kudu/tools/rebalance_algo.cc b/src/kudu/rebalance/rebalance_algo.cc
similarity index 99%
rename from src/kudu/tools/rebalance_algo.cc
rename to src/kudu/rebalance/rebalance_algo.cc
index eaec6da..b2870cb 100644
--- a/src/kudu/tools/rebalance_algo.cc
+++ b/src/kudu/rebalance/rebalance_algo.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/tools/rebalance_algo.h"
+#include "kudu/rebalance/rebalance_algo.h"
#include <algorithm>
#include <cmath>
@@ -51,7 +51,7 @@ using std::vector;
using strings::Substitute;
namespace kudu {
-namespace tools {
+namespace rebalance {
namespace {
@@ -195,7 +195,6 @@ Status RebalancingAlgo::ApplyMove(const TableReplicaMove& move,
TwoDimensionalGreedyAlgo::TwoDimensionalGreedyAlgo(EqualSkewOption opt)
: equal_skew_opt_(opt),
- random_device_(),
generator_(random_device_()) {
}
@@ -618,5 +617,5 @@ Status LocationBalancingAlgo::FindBestMove(
return Status::OK();
}
-} // namespace tools
+} // namespace rebalance
} // namespace kudu
diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/rebalance/rebalance_algo.h
similarity index 99%
rename from src/kudu/tools/rebalance_algo.h
rename to src/kudu/rebalance/rebalance_algo.h
index 22da043..f5b790a 100644
--- a/src/kudu/tools/rebalance_algo.h
+++ b/src/kudu/rebalance/rebalance_algo.h
@@ -33,7 +33,7 @@ template <class T> class optional;
} // namespace boost
namespace kudu {
-namespace tools {
+namespace rebalance {
// A map from a count of replicas to a server identifier. The "reversed"
// relationship facilitates finding the servers with the maximum and minimum
@@ -286,5 +286,5 @@ class LocationBalancingAlgo : public RebalancingAlgo {
const double load_imbalance_threshold_;
};
-} // namespace tools
+} // namespace rebalance
} // namespace kudu
diff --git a/src/kudu/rebalance/rebalancer.cc b/src/kudu/rebalance/rebalancer.cc
new file mode 100644
index 0000000..b3c1c6c
--- /dev/null
+++ b/src/kudu/rebalance/rebalancer.cc
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rebalance/rebalancer.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/consensus/quorum_util.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/rebalance/placement_policy_util.h"
+#include "kudu/rebalance/rebalance_algo.h"
+#include "kudu/util/status.h"
+
+using kudu::cluster_summary::HealthCheckResult;
+using kudu::cluster_summary::HealthCheckResultToString;
+using kudu::cluster_summary::ServerHealth;
+
+using std::numeric_limits;
+using std::set;
+using std::string;
+using std::unordered_map;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace rebalance {
+
+Rebalancer::Config::Config(
+ vector<string> ignored_tservers_param,
+ vector<string> master_addresses,
+ vector<string> table_filters,
+ size_t max_moves_per_server,
+ size_t max_staleness_interval_sec,
+ int64_t max_run_time_sec,
+ bool move_rf1_replicas,
+ bool output_replica_distribution_details,
+ bool run_policy_fixer,
+ bool run_cross_location_rebalancing,
+ bool run_intra_location_rebalancing,
+ double load_imbalance_threshold)
+ : ignored_tservers(ignored_tservers_param.begin(), ignored_tservers_param.end()),
+ master_addresses(std::move(master_addresses)),
+ table_filters(std::move(table_filters)),
+ max_moves_per_server(max_moves_per_server),
+ max_staleness_interval_sec(max_staleness_interval_sec),
+ max_run_time_sec(max_run_time_sec),
+ move_rf1_replicas(move_rf1_replicas),
+ output_replica_distribution_details(output_replica_distribution_details),
+ run_policy_fixer(run_policy_fixer),
+ run_cross_location_rebalancing(run_cross_location_rebalancing),
+ run_intra_location_rebalancing(run_intra_location_rebalancing),
+ load_imbalance_threshold(load_imbalance_threshold) {
+ DCHECK_GE(max_moves_per_server, 0);
+}
+
+Rebalancer::Rebalancer(Config config)
+ : config_(std::move(config)) {
+}
+
+// Given high-level description of moves, find tablets with replicas at the
+// corresponding tablet servers to satisfy those high-level descriptions.
+// The idea is to find all tablets of the specified table that would have a
+// replica at the source server, but would not have a replica at the destination
+// server. That is to satisfy the restriction of having no more than one replica
+// of the same tablet per server.
+//
+// An additional constraint: it's better not to move leader replicas, if
+// possible. If a client has a write operation in progress, moving leader
+// replicas of affected tablets would make the client to re-resolve new leaders
+// and retry the operations. Moving leader replicas is used as last resort
+// when no other candidates are left.
+Status Rebalancer::FindReplicas(const TableReplicaMove& move,
+ const ClusterRawInfo& raw_info,
+ vector<string>* tablet_ids) {
+ const auto& table_id = move.table_id;
+
+ // Tablet ids of replicas on the source tserver that are non-leaders.
+ vector<string> tablet_uuids_src;
+ // Tablet ids of replicas on the source tserver that are leaders.
+ vector<string> tablet_uuids_src_leaders;
+ // UUIDs of tablets of the selected table at the destination tserver.
+ vector<string> tablet_uuids_dst;
+
+ for (const auto& tablet_summary : raw_info.tablet_summaries) {
+ if (tablet_summary.table_id != table_id) {
+ continue;
+ }
+ if (tablet_summary.result != HealthCheckResult::HEALTHY) {
+ VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 "
+ "as candidates for movement since the tablet's "
+ "status is '$2'",
+ table_id, tablet_summary.id,
+ HealthCheckResultToString(tablet_summary.result));
+ continue;
+ }
+ for (const auto& replica_summary : tablet_summary.replicas) {
+ if (replica_summary.ts_uuid != move.from &&
+ replica_summary.ts_uuid != move.to) {
+ continue;
+ }
+ if (!replica_summary.ts_healthy) {
+ VLOG(1) << Substitute("table $0: not considering replica movement "
+ "from $1 to $2 since server $3 is not healthy",
+ table_id,
+ move.from, move.to, replica_summary.ts_uuid);
+ continue;
+ }
+ if (replica_summary.ts_uuid == move.from) {
+ if (replica_summary.is_leader) {
+ tablet_uuids_src_leaders.emplace_back(tablet_summary.id);
+ } else {
+ tablet_uuids_src.emplace_back(tablet_summary.id);
+ }
+ } else {
+ DCHECK_EQ(move.to, replica_summary.ts_uuid);
+ tablet_uuids_dst.emplace_back(tablet_summary.id);
+ }
+ }
+ }
+ sort(tablet_uuids_src.begin(), tablet_uuids_src.end());
+ sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end());
+
+ vector<string> tablet_uuids;
+ set_difference(
+ tablet_uuids_src.begin(), tablet_uuids_src.end(),
+ tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
+ inserter(tablet_uuids, tablet_uuids.begin()));
+
+ if (!tablet_uuids.empty()) {
+ // If there are tablets with non-leader replicas at the source server,
+ // those are the best candidates for movement.
+ tablet_ids->swap(tablet_uuids);
+ return Status::OK();
+ }
+
+ // If no tablets with non-leader replicas were found, resort to tablets with
+ // leader replicas at the source server.
+ DCHECK(tablet_uuids.empty());
+ sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end());
+ set_difference(
+ tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(),
+ tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
+ inserter(tablet_uuids, tablet_uuids.begin()));
+
+ tablet_ids->swap(tablet_uuids);
+
+ return Status::OK();
+}
+
+void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
+ vector<ReplicaMove>* replica_moves) {
+ unordered_set<string> tablet_uuids;
+ vector<ReplicaMove> filtered_replica_moves;
+ for (auto& move_op : *replica_moves) {
+ const auto& tablet_uuid = move_op.tablet_uuid;
+ if (ContainsKey(scheduled_moves, tablet_uuid)) {
+ // There is a move operation in progress for the tablet, don't schedule
+ // another one.
+ continue;
+ }
+ if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) {
+ filtered_replica_moves.emplace_back(std::move(move_op));
+ } else {
+ // Rationale behind the unique tablet constraint: the implementation of
+ // the Run() method is designed to re-order operations suggested by the
+ // high-level algorithm to use the op-count-per-tablet-server capacity
+ // as much as possible. Right now, the RunStep() method outputs only one
+ // move operation per tablet in every batch. The code below is to
+ // enforce the contract between Run() and RunStep() methods.
+ LOG(DFATAL) << "detected multiple replica move operations for the same "
+ "tablet " << tablet_uuid;
+ }
+ }
+ *replica_moves = std::move(filtered_replica_moves);
+}
+
+Status Rebalancer::FilterCrossLocationTabletCandidates(
+ const unordered_map<string, string>& location_by_ts_id,
+ const TabletsPlacementInfo& placement_info,
+ const TableReplicaMove& move,
+ vector<string>* tablet_ids) {
+ DCHECK(tablet_ids);
+
+ if (tablet_ids->empty()) {
+ // Nothing to filter.
+ return Status::OK();
+ }
+
+ const auto& dst_location = FindOrDie(location_by_ts_id, move.to);
+ const auto& src_location = FindOrDie(location_by_ts_id, move.from);
+
+ // Sanity check: the source and the destination tablet servers should be
+ // in different locations.
+ if (src_location == dst_location) {
+ return Status::InvalidArgument(Substitute(
+ "moving replicas of table $0: the same location '$1' for both "
+ "the source ($2) and the destination ($3) tablet servers",
+ move.table_id, src_location, move.from, move.to));
+ }
+ if (dst_location.empty()) {
+ // The destination location is not specified, so no restrictions on the
+ // destination location to check for.
+ return Status::OK();
+ }
+
+ vector<string> tablet_ids_filtered;
+ for (auto& tablet_id : *tablet_ids) {
+ const auto& replica_count_info = FindOrDie(
+ placement_info.tablet_location_info, tablet_id);
+ const auto* count_ptr = FindOrNull(replica_count_info, dst_location);
+ if (count_ptr == nullptr) {
+ // Nothing else to clarify: not a single replica in the destnation
+ // location for this candidate tablet.
+ tablet_ids_filtered.emplace_back(std::move(tablet_id));
+ continue;
+ }
+ const auto location_replica_num = *count_ptr;
+ const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, tablet_id);
+ const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
+ const auto rf = table_info.replication_factor;
+ // In case of RF=2*N+1, losing (N + 1) replicas means losing the majority.
+ // In case of RF=2*N, losing at least N replicas means losing the majority.
+ const auto replica_num_threshold = rf % 2 ? consensus::MajoritySize(rf)
+ : rf / 2;
+ if (location_replica_num + 1 >= replica_num_threshold) {
+ VLOG(1) << Substitute("destination location '$0' for candidate tablet $1 "
+ "already contains $2 of $3 replicas",
+ dst_location, tablet_id, location_replica_num, rf);
+ continue;
+ }
+ // No majority of replicas in the destination location: it's OK candidate.
+ tablet_ids_filtered.emplace_back(std::move(tablet_id));
+ }
+
+ *tablet_ids = std::move(tablet_ids_filtered);
+
+ return Status::OK();
+}
+
+Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
+ const MovesInProgress& moves_in_progress,
+ ClusterInfo* info) const {
+ DCHECK(info);
+
+ // tserver UUID --> total replica count of all table's tablets at the server
+ typedef unordered_map<string, int32_t> TableReplicasAtServer;
+
+ // The result information to build.
+ ClusterInfo result_info;
+
+ unordered_map<string, int32_t> tserver_replicas_count;
+ unordered_map<string, TableReplicasAtServer> table_replicas_info;
+
+ // Build a set of tables with RF=1 (single replica tables).
+ unordered_set<string> rf1_tables;
+ if (!config_.move_rf1_replicas) {
+ for (const auto& s : raw_info.table_summaries) {
+ if (s.replication_factor == 1) {
+ rf1_tables.emplace(s.id);
+ }
+ }
+ }
+
+ auto& ts_uuids_by_location = result_info.locality.servers_by_location;
+ auto& location_by_ts_uuid = result_info.locality.location_by_ts_id;
+ for (const auto& summary : raw_info.tserver_summaries) {
+ const auto& ts_id = summary.uuid;
+ const auto& ts_location = summary.ts_location;
+ VLOG(1) << Substitute("found tserver $0 at location '$1'", ts_id, ts_location);
+ EmplaceOrDie(&location_by_ts_uuid, ts_id, ts_location);
+ auto& ts_ids = LookupOrEmplace(&ts_uuids_by_location,
+ ts_location, set<string>());
+ InsertOrDie(&ts_ids, ts_id);
+ }
+
+ for (const auto& s : raw_info.tserver_summaries) {
+ if (s.health != ServerHealth::HEALTHY) {
+ LOG(INFO) << Substitute("skipping tablet server $0 ($1) because of its "
+ "non-HEALTHY status ($2)",
+ s.uuid, s.address,
+ ServerHealthToString(s.health));
+ continue;
+ }
+ tserver_replicas_count.emplace(s.uuid, 0);
+ }
+
+ for (const auto& tablet : raw_info.tablet_summaries) {
+ if (!config_.move_rf1_replicas) {
+ if (rf1_tables.find(tablet.table_id) != rf1_tables.end()) {
+ LOG(INFO) << Substitute("tablet $0 of table '$1' ($2) has single replica, skipping",
+ tablet.id, tablet.table_name, tablet.table_id);
+ continue;
+ }
+ }
+
+ // Check if it's one of the tablets which are currently being rebalanced.
+ // If so, interpret the move as successfully completed, updating the
+ // replica counts correspondingly.
+ const auto it_pending_moves = moves_in_progress.find(tablet.id);
+ if (it_pending_moves != moves_in_progress.end()) {
+ const auto& move_info = it_pending_moves->second;
+ bool is_target_replica_present = false;
+ // Verify that the target replica is present in the config.
+ for (const auto& tr : tablet.replicas) {
+ if (tr.ts_uuid == move_info.ts_uuid_to) {
+ is_target_replica_present = true;
+ break;
+ }
+ }
+ // If the target replica is present, it will be processed in the code
+ // below. Otherwise, it's necessary to pretend as if the target replica
+ // is in the config already: the idea is to count in the absent target
+ // replica as if the movement has successfully completed already.
+ auto it = tserver_replicas_count.find(move_info.ts_uuid_to);
+ if (!is_target_replica_present && !move_info.ts_uuid_to.empty() &&
+ it != tserver_replicas_count.end()) {
+ it->second++;
+ auto table_ins = table_replicas_info.emplace(
+ tablet.table_id, TableReplicasAtServer());
+ TableReplicasAtServer& replicas_at_server = table_ins.first->second;
+
+ auto replicas_ins = replicas_at_server.emplace(move_info.ts_uuid_to, 0);
+ replicas_ins.first->second++;
+ }
+ }
+
+ for (const auto& ri : tablet.replicas) {
+ // Increment total count of replicas at the tablet server.
+ auto it = tserver_replicas_count.find(ri.ts_uuid);
+ if (it == tserver_replicas_count.end()) {
+ string msg = Substitute("skipping replica at tserver $0", ri.ts_uuid);
+ if (ri.ts_address) {
+ msg += " (" + *ri.ts_address + ")";
+ }
+ msg += " since it's not reported among known tservers";
+ LOG(INFO) << msg;
+ continue;
+ }
+ bool do_count_replica = true;
+ if (it_pending_moves != moves_in_progress.end()) {
+ const auto& move_info = it_pending_moves->second;
+ if (move_info.ts_uuid_from == ri.ts_uuid) {
+ DCHECK(!ri.ts_uuid.empty());
+ // The source replica of the scheduled replica movement operation
+ // are still in the config. Interpreting the move as successfully
+ // completed, so the source replica should not be counted in.
+ do_count_replica = false;
+ }
+ }
+ if (do_count_replica) {
+ it->second++;
+ }
+
+ auto table_ins = table_replicas_info.emplace(
+ tablet.table_id, TableReplicasAtServer());
+ TableReplicasAtServer& replicas_at_server = table_ins.first->second;
+
+ auto replicas_ins = replicas_at_server.emplace(ri.ts_uuid, 0);
+ if (do_count_replica) {
+ replicas_ins.first->second++;
+ }
+ }
+ }
+
+ // Check for the consistency of information derived from the health report.
+ for (const auto& elem : tserver_replicas_count) {
+ const auto& ts_uuid = elem.first;
+ int32_t count_by_table_info = 0;
+ for (auto& e : table_replicas_info) {
+ count_by_table_info += e.second[ts_uuid];
+ }
+ if (elem.second != count_by_table_info) {
+ return Status::Corruption("inconsistent cluster state returned by check");
+ }
+ }
+
+ // Populate ClusterBalanceInfo::servers_by_total_replica_count
+ auto& servers_by_count = result_info.balance.servers_by_total_replica_count;
+ for (const auto& elem : tserver_replicas_count) {
+ servers_by_count.emplace(elem.second, elem.first);
+ }
+
+ // Populate ClusterBalanceInfo::table_info_by_skew
+ auto& table_info_by_skew = result_info.balance.table_info_by_skew;
+ for (const auto& elem : table_replicas_info) {
+ const auto& table_id = elem.first;
+ int32_t max_count = numeric_limits<int32_t>::min();
+ int32_t min_count = numeric_limits<int32_t>::max();
+ TableBalanceInfo tbi;
+ tbi.table_id = table_id;
+ for (const auto& e : elem.second) {
+ const auto& ts_uuid = e.first;
+ const auto replica_count = e.second;
+ tbi.servers_by_replica_count.emplace(replica_count, ts_uuid);
+ max_count = std::max(replica_count, max_count);
+ min_count = std::min(replica_count, min_count);
+ }
+ table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
+ }
+ // TODO(aserbin): add sanity checks on the result.
+ *info = std::move(result_info);
+
+ return Status::OK();
+}
+
+
+} // namespace rebalance
+} // namespace kudu
diff --git a/src/kudu/rebalance/rebalancer.h b/src/kudu/rebalance/rebalancer.h
new file mode 100644
index 0000000..50be60c
--- /dev/null
+++ b/src/kudu/rebalance/rebalancer.h
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/util/status.h"
+
+
+namespace kudu {
+namespace rebalance {
+
+struct ClusterInfo;
+struct TableReplicaMove;
+struct TabletsPlacementInfo;
+
+// Sub-set of fields from ClusterResult which are relevant to the rebalancing.
+struct ClusterRawInfo {
+ std::vector<cluster_summary::ServerHealthSummary> tserver_summaries;
+ std::vector<cluster_summary::TableSummary> table_summaries;
+ std::vector<cluster_summary::TabletSummary> tablet_summaries;
+};
+
+// A class implementing logic for Kudu cluster rebalancing.
+// A Rebalancer object encapsulates structs with cluster information
+// and functions needed to calculate and implement replica moves.
+class Rebalancer {
+ public:
+ // Configuration parameters for the rebalancer aggregated into a struct.
+ struct Config {
+ static constexpr double kLoadImbalanceThreshold = 1.0;
+
+ // NOLINTNEXTLINE(google-explicit-constructor)
+ Config(std::vector<std::string> ignored_tservers_param = {},
+ std::vector<std::string> master_addresses = {},
+ std::vector<std::string> table_filters = {},
+ size_t max_moves_per_server = 5,
+ size_t max_staleness_interval_sec = 300,
+ int64_t max_run_time_sec = 0,
+ bool move_rf1_replicas = false,
+ bool output_replica_distribution_details = false,
+ bool run_policy_fixer = true,
+ bool run_cross_location_rebalancing = true,
+ bool run_intra_location_rebalancing = true,
+ double load_imbalance_threshold = kLoadImbalanceThreshold);
+
+ // UUIDs of ignored servers. If empty, allow to run the
+ // rebalancing only when all tablet servers in cluster are healthy.
+ // If not empty, allow to run the rebalancing when servers in
+ // ignored_tservers are unhealthy.
+ std::unordered_set<std::string> ignored_tservers;
+
+ // Kudu masters' RPC endpoints.
+ std::vector<std::string> master_addresses;
+
+ // Names of tables to balance. If empty, every table and the whole cluster
+ // will be balanced.
+ std::vector<std::string> table_filters;
+
+ // Maximum number of move operations to run concurrently on one server.
+ // An 'operation on a server' means a move operation where either source or
+ // destination replica is located on the specified server.
+ size_t max_moves_per_server;
+
+ // Maximum duration of the 'staleness' interval, when the rebalancer cannot
+ // make any progress in scheduling new moves and no prior scheduled moves
+ // are left, even if re-synchronizing against the cluster's state again and
+ // again. Such a staleness usually happens in case of a persistent problem
+ // with the cluster or when some unexpected concurrent activity is present
+ // (such as automatic recovery of failed replicas, etc.).
+ size_t max_staleness_interval_sec;
+
+ // Maximum run time, in seconds.
+ int64_t max_run_time_sec;
+
+ // Whether to move replicas of tablets with replication factor of one.
+ bool move_rf1_replicas;
+
+ // Whether Rebalancer::PrintStats() should output per-table and per-server
+ // replica distribution details.
+ bool output_replica_distribution_details;
+
+ // In case of multi-location cluster, whether to detect and fix placement
+ // policy violations. Fixing placement policy violations involves moving
+ // tablet replicas across different locations in the cluster.
+ // This setting is applicable to multi-location clusters only.
+ bool run_policy_fixer;
+
+ // In case of multi-location cluster, whether to move tablet replicas
+ // between locations in attempt to spread tablet replicas among location
+ // evenly (equalizing loads of locations throughout the cluster).
+ // This setting is applicable to multi-location clusters only.
+ bool run_cross_location_rebalancing;
+
+ // In case of multi-location cluster, whether to rebalance tablet replica
+ // distribution within each location.
+ // This setting is applicable to multi-location clusters only.
+ bool run_intra_location_rebalancing;
+
+ // The per-table location load imbalance threshold for the cross-location
+ // balancing algorithm.
+ double load_imbalance_threshold;
+ };
+
+ // Represents a concrete move of a replica from one tablet server to another.
+ // Formed logically from a TableReplicaMove by specifying a tablet for the move.
+ // Originally from "tools/rebalancer.h"
+ struct ReplicaMove {
+ std::string tablet_uuid;
+ std::string ts_uuid_from;
+ std::string ts_uuid_to;
+ boost::optional<int64_t> config_opid_idx; // for CAS-enabled Raft changes
+ };
+
+ enum class RunStatus {
+ UNKNOWN,
+ CLUSTER_IS_BALANCED,
+ TIMED_OUT,
+ };
+
+ // A helper type: key is tablet UUID which corresponds to value.tablet_uuid.
+ // Originally from "tools/rebalancer.h"
+ typedef std::unordered_map<std::string, ReplicaMove> MovesInProgress;
+
+ explicit Rebalancer(Config config);
+
+ protected:
+ // Helper class to find and schedule next available rebalancing move operation
+ // and track already scheduled ones.
+ class Runner {
+ public:
+ virtual ~Runner() = default;
+
+ // Initialize instance of Runner so it can run against Kudu cluster with
+ // the 'master_addresses' RPC endpoints.
+ virtual Status Init(std::vector<std::string> master_addresses) = 0;
+
+ // Load information on the prescribed replica movement operations. Also,
+ // populate helper containers and other auxiliary run-time structures
+ // used by ScheduleNextMove(). This method is called with every batch
+ // of move operations output by the rebalancing algorithm once previously
+ // loaded moves have been scheduled.
+ virtual void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) = 0;
+
+ // Schedule next replica move. Returns 'true' if replica move operation
+ // has been scheduled successfully; otherwise returns 'false' and sets
+ // the 'has_errors' and 'timed_out' parameters accordingly.
+ virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0;
+
+ // Update statuses and auxiliary information on in-progress replica move
+ // operations. The 'timed_out' parameter is set to 'true' if not all
+ // in-progress operations were processed by the deadline specified by
+ // the 'deadline_' member field. The method returns 'true' if it's necessary
+ // to clear the state of the in-progress operations, i.e. 'forget'
+ // those, starting from a clean state.
+ virtual bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) = 0;
+
+ virtual Status GetNextMoves(bool* has_moves) = 0;
+
+ virtual uint32_t moves_count() const = 0;
+ }; // class Runner
+
+ friend class KsckResultsToClusterBalanceInfoTest;
+
+ // Given high-level move-some-tablet-replica-for-a-table information from the
+ // rebalancing algorithm, find appropriate tablet replicas to move between the
+ // specified tablet servers. The set of result tablet UUIDs is output
+ // into the 'tablet_ids' container (note: the container is first cleared).
+ // The source and destination replicas are determined by the elements of the
+ // 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from and
+ // TableReplica::to correspondingly. If no suitable tablet replicas are found,
+ // 'tablet_ids' will be empty with the result status of Status::OK().
+ static Status FindReplicas(const TableReplicaMove& move,
+ const ClusterRawInfo& raw_info,
+ std::vector<std::string>* tablet_ids);
+
+ // Filter move operations in 'replica_moves': remove all operations that would
+ // involve moving replicas of tablets which are in 'scheduled_moves'. The
+ // 'replica_moves' cannot be null.
+ static void FilterMoves(const MovesInProgress& scheduled_moves,
+ std::vector<ReplicaMove>* replica_moves);
+
+ // Filter the list of candidate tablets to make sure the location
+ // of the destination server would not contain the majority of replicas
+ // after the move. The 'tablet_ids' is an in-out parameter.
+ static Status FilterCrossLocationTabletCandidates(
+ const std::unordered_map<std::string, std::string>& location_by_ts_id,
+ const TabletsPlacementInfo& placement_info,
+ const TableReplicaMove& move,
+ std::vector<std::string>* tablet_ids);
+
+ // Convert the 'raw' information about the cluster into information suitable
+ // for the input of the high-level rebalancing algorithm.
+ // The 'moves_in_progress' parameter contains information on the replica moves
+ // which have been scheduled by a caller and still in progress: those are
+ // considered as successfully completed and applied to the 'raw_info' when
+ // building ClusterBalanceInfo for the specified 'raw_info' input. The idea
+ // is to prevent the algorithm outputting the same moves again while some
+ // of the moves recommended at prior steps are still in progress.
+ // The result cluster balance information is output into the 'info' parameter.
+ // The 'info' output parameter cannot be null.
+ Status BuildClusterInfo(const ClusterRawInfo& raw_info,
+ const MovesInProgress& moves_in_progress,
+ ClusterInfo* info) const;
+
+ // Configuration for the rebalancer.
+ const Config config_;
+};
+
+} // namespace rebalance
+} // namespace kudu
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 09b058a..901a081 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -76,6 +76,7 @@ target_link_libraries(ksck
kudu_tools_util
master
master_proto
+ rebalance
server_base_proto
tserver_proto
tserver_service_proto
@@ -87,14 +88,13 @@ target_link_libraries(ksck
#######################################
add_library(kudu_tools_rebalance
- rebalancer.cc
- rebalance_algo.cc
- placement_policy_util.cc
+ rebalancer_tool.cc
tool_replica_util.cc
)
target_link_libraries(kudu_tools_rebalance
ksck
kudu_common
+ rebalance
${KUDU_BASE_LIBS}
)
@@ -188,9 +188,6 @@ ADD_KUDU_TEST_DEPENDENCIES(kudu-tool-test
ADD_KUDU_TEST(kudu-ts-cli-test)
ADD_KUDU_TEST_DEPENDENCIES(kudu-ts-cli-test
kudu)
-ADD_KUDU_TEST(placement_policy_util-test)
-ADD_KUDU_TEST(rebalance-test)
-ADD_KUDU_TEST(rebalance_algo-test)
ADD_KUDU_TEST(rebalancer_tool-test
NUM_SHARDS 8 PROCESSORS 3
DATA_FILES ../scripts/assign-location.py)
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index 24418ed..f8e7a0a 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -41,6 +41,7 @@
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rebalance/cluster_status.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h"
@@ -59,18 +60,26 @@ DECLARE_string(color);
DECLARE_string(ksck_format);
DECLARE_uint32(truncate_server_csv_length);
-namespace kudu {
-namespace tools {
+using kudu::cluster_summary::ConsensusConfigType;
+using kudu::cluster_summary::ConsensusState;
+using kudu::cluster_summary::ConsensusStateMap;
+using kudu::cluster_summary::ReplicaSummary;
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::ServerHealthSummary;
+using kudu::cluster_summary::TableSummary;
+using kudu::cluster_summary::TabletSummary;
+using kudu::server::GetFlagsResponsePB;
+using kudu::tablet::TabletDataState;
-using server::GetFlagsResponsePB;
using std::ostringstream;
using std::shared_ptr;
using std::static_pointer_cast;
using std::string;
-using std::unordered_map;
using std::vector;
using strings::Substitute;
-using tablet::TabletDataState;
+
+namespace kudu {
+namespace tools {
class MockKsckMaster : public KsckMaster {
public:
@@ -118,13 +127,13 @@ class MockKsckTabletServer : public KsckTabletServer {
explicit MockKsckTabletServer(const string& uuid)
: KsckTabletServer(uuid),
fetch_info_status_(Status::OK()),
- fetch_info_health_(KsckServerHealth::HEALTHY),
+ fetch_info_health_(ServerHealth::HEALTHY),
address_("<mock>") {
version_ = "mock-version";
flags_ = GetFlagsResponsePB{};
}
- Status FetchInfo(KsckServerHealth* health) override {
+ Status FetchInfo(ServerHealth* health) override {
CHECK(health);
*health = fetch_info_health_;
timestamp_ = 12345;
@@ -136,7 +145,7 @@ class MockKsckTabletServer : public KsckTabletServer {
return fetch_info_status_;
}
- Status FetchConsensusState(KsckServerHealth* /*health*/) override {
+ Status FetchConsensusState(ServerHealth* /*health*/) override {
return Status::OK();
}
@@ -168,7 +177,7 @@ class MockKsckTabletServer : public KsckTabletServer {
// Public because the unit tests mutate these variables directly.
Status fetch_info_status_;
- KsckServerHealth fetch_info_health_;
+ ServerHealth fetch_info_health_;
// The fake checksum for replicas on this mock server.
uint64_t checksum_ = 0;
// The fake progress amount for this mock server, used to mock checksum
@@ -251,14 +260,14 @@ class KsckTest : public KuduTest {
protected:
// Returns the expected summary for a table with the given tablet states.
- std::string ExpectedKsckTableSummary(const string& table_name,
- int replication_factor,
- int healthy_tablets,
- int recovering_tablets,
- int underreplicated_tablets,
- int consensus_mismatch_tablets,
- int unavailable_tablets) {
- KsckTableSummary table_summary;
+ std::string ExpectedTableSummary(const string& table_name,
+ int replication_factor,
+ int healthy_tablets,
+ int recovering_tablets,
+ int underreplicated_tablets,
+ int consensus_mismatch_tablets,
+ int unavailable_tablets) {
+ TableSummary table_summary;
table_summary.name = table_name;
table_summary.replication_factor = replication_factor;
table_summary.healthy_tablets = healthy_tablets;
@@ -470,7 +479,7 @@ class KsckTest : public KuduTest {
void CheckJsonVsServerHealthSummaries(
const JsonReader& r,
const string& key,
- const boost::optional<vector<KsckServerHealthSummary>>& summaries) {
+ const boost::optional<vector<ServerHealthSummary>>& summaries) {
if (!summaries || summaries->empty()) {
EXPECT_JSON_FIELD_NOT_PRESENT(r, r.root(), key.c_str());
return;
@@ -490,25 +499,24 @@ void CheckJsonVsServerHealthSummaries(
}
}
-const string KsckConsensusConfigTypeToString(KsckConsensusConfigType t) {
+const string ConsensusConfigTypeToString(ConsensusConfigType t) {
switch (t) {
- case KsckConsensusConfigType::COMMITTED:
+ case ConsensusConfigType::COMMITTED:
return "COMMITTED";
- case KsckConsensusConfigType::PENDING:
+ case ConsensusConfigType::PENDING:
return "PENDING";
- case KsckConsensusConfigType::MASTER:
+ case ConsensusConfigType::MASTER:
return "MASTER";
default:
- LOG(FATAL) << "unknown KsckConsensusConfigType";
+ LOG(FATAL) << "unknown ConsensusConfigType";
}
}
void CheckJsonVsConsensusState(const JsonReader& r,
const rapidjson::Value* cstate,
- const string& ref_id,
- const KsckConsensusState& ref_cstate) {
+ const ConsensusState& ref_cstate) {
EXPECT_JSON_STRING_FIELD(r, cstate, "type",
- KsckConsensusConfigTypeToString(ref_cstate.type));
+ ConsensusConfigTypeToString(ref_cstate.type));
if (ref_cstate.leader_uuid) {
EXPECT_JSON_STRING_FIELD(r, cstate, "leader_uuid", ref_cstate.leader_uuid);
} else {
@@ -554,7 +562,7 @@ void CheckJsonVsConsensusState(const JsonReader& r,
void CheckJsonVsReplicaSummary(const JsonReader& r,
const rapidjson::Value* replica,
- const KsckReplicaSummary& ref_replica) {
+ const ReplicaSummary& ref_replica) {
EXPECT_JSON_STRING_FIELD(r, replica, "ts_uuid", ref_replica.ts_uuid);
if (ref_replica.ts_address) {
EXPECT_JSON_STRING_FIELD(r, replica, "ts_address", ref_replica.ts_address);
@@ -581,7 +589,7 @@ void CheckJsonVsReplicaSummary(const JsonReader& r,
if (ref_replica.consensus_state) {
const rapidjson::Value* cstate;
ASSERT_OK(r.ExtractObject(replica, "consensus_state", &cstate));
- CheckJsonVsConsensusState(r, cstate, ref_replica.ts_uuid, *ref_replica.consensus_state);
+ CheckJsonVsConsensusState(r, cstate, *ref_replica.consensus_state);
} else {
EXPECT_JSON_FIELD_NOT_PRESENT(r, replica, "consensus_state");
}
@@ -589,7 +597,7 @@ void CheckJsonVsReplicaSummary(const JsonReader& r,
void CheckJsonVsMasterConsensus(const JsonReader& r,
bool ref_conflict,
- const boost::optional<KsckConsensusStateMap>& ref_cstates) {
+ const boost::optional<ConsensusStateMap>& ref_cstates) {
if (!ref_cstates || ref_cstates->empty()) {
EXPECT_JSON_FIELD_NOT_PRESENT(r, r.root(), "master_consensus_states");
return;
@@ -600,13 +608,13 @@ void CheckJsonVsMasterConsensus(const JsonReader& r,
cstates, ref_cstates->size());
int i = 0;
for (const auto& entry : *ref_cstates) {
- CheckJsonVsConsensusState(r, cstates[i++], entry.first, entry.second);
+ CheckJsonVsConsensusState(r, cstates[i++], entry.second);
}
}
void CheckJsonVsTableSummaries(const JsonReader& r,
const string& key,
- const boost::optional<vector<KsckTableSummary>>& ref_tables) {
+ const boost::optional<vector<TableSummary>>& ref_tables) {
if (!ref_tables || ref_tables->empty()) {
EXPECT_JSON_FIELD_NOT_PRESENT(r, r.root(), key.c_str());
return;
@@ -619,7 +627,7 @@ void CheckJsonVsTableSummaries(const JsonReader& r,
EXPECT_JSON_STRING_FIELD(r, table, "id", ref_table.id);
EXPECT_JSON_STRING_FIELD(r, table, "name", ref_table.name);
EXPECT_JSON_STRING_FIELD(r, table,
- "health", KsckCheckResultToString(ref_table.TableStatus()));
+ "health", HealthCheckResultToString(ref_table.TableStatus()));
EXPECT_JSON_INT_FIELD(r, table,
"replication_factor", ref_table.replication_factor);
EXPECT_JSON_INT_FIELD(r, table,
@@ -639,7 +647,7 @@ void CheckJsonVsTableSummaries(const JsonReader& r,
void CheckJsonVsTabletSummaries(const JsonReader& r,
const string& key,
- const boost::optional<vector<KsckTabletSummary>>& ref_tablets) {
+ const boost::optional<vector<TabletSummary>>& ref_tablets) {
if (!ref_tablets || ref_tablets->empty()) {
EXPECT_JSON_FIELD_NOT_PRESENT(r, r.root(), key.c_str());
return;
@@ -653,11 +661,11 @@ void CheckJsonVsTabletSummaries(const JsonReader& r,
EXPECT_JSON_STRING_FIELD(r, tablet, "table_id", ref_tablet.table_id);
EXPECT_JSON_STRING_FIELD(r, tablet, "table_name", ref_tablet.table_name);
EXPECT_JSON_STRING_FIELD(r, tablet,
- "health", KsckCheckResultToString(ref_tablet.result));
+ "health", HealthCheckResultToString(ref_tablet.result));
EXPECT_JSON_STRING_FIELD(r, tablet, "status", ref_tablet.status);
const rapidjson::Value* master_cstate;
ASSERT_OK(r.ExtractObject(tablet, "master_cstate", &master_cstate));
- CheckJsonVsConsensusState(r, master_cstate, "master", ref_tablet.master_cstate);
+ CheckJsonVsConsensusState(r, master_cstate, ref_tablet.master_cstate);
if (ref_tablet.replicas.empty()) {
EXPECT_JSON_FIELD_NOT_PRESENT(r, tablet, "replicas");
continue;
@@ -761,14 +769,18 @@ void CheckJsonVsCountSummaries(const JsonReader& r,
vector<const rapidjson::Value*> count_results;
EXTRACT_ARRAY_CHECK_SIZE(r, r.root(), key.c_str(), count_results, 1);
- EXPECT_JSON_INT_FIELD(r, count_results[0], "masters", ref_result->master_summaries.size());
- EXPECT_JSON_INT_FIELD(r, count_results[0], "tservers", ref_result->tserver_summaries.size());
- EXPECT_JSON_INT_FIELD(r, count_results[0], "tables", ref_result->table_summaries.size());
- EXPECT_JSON_INT_FIELD(r, count_results[0], "tablets", ref_result->tablet_summaries.size());
- int replica_count = std::accumulate(ref_result->tablet_summaries.begin(),
- ref_result->tablet_summaries.end(),
+ EXPECT_JSON_INT_FIELD(r, count_results[0], "masters",
+ ref_result->cluster_status.master_summaries.size());
+ EXPECT_JSON_INT_FIELD(r, count_results[0], "tservers",
+ ref_result->cluster_status.tserver_summaries.size());
+ EXPECT_JSON_INT_FIELD(r, count_results[0], "tables",
+ ref_result->cluster_status.table_summaries.size());
+ EXPECT_JSON_INT_FIELD(r, count_results[0], "tablets",
+ ref_result->cluster_status.tablet_summaries.size());
+ int replica_count = std::accumulate(ref_result->cluster_status.tablet_summaries.begin(),
+ ref_result->cluster_status.tablet_summaries.end(),
0,
- [](int acc, const KsckTabletSummary& ts) {
+ [](int acc, const TabletSummary& ts) {
return acc + ts.replicas.size();
});
EXPECT_JSON_INT_FIELD(r, count_results[0], "replicas", replica_count);
@@ -830,32 +842,38 @@ void CheckJsonStringVsKsckResults(const string& json,
r,
"master_summaries",
sections & PrintSections::Values::MASTER_SUMMARIES ?
- boost::optional<vector<KsckServerHealthSummary>>(results.master_summaries) : boost::none);
+ boost::optional<vector<ServerHealthSummary>>
+ (results.cluster_status.master_summaries) : boost::none);
CheckJsonVsMasterConsensus(
r,
- results.master_consensus_conflict,
+ results.cluster_status.master_consensus_conflict,
sections & PrintSections::Values::MASTER_SUMMARIES ?
- boost::optional<KsckConsensusStateMap>(results.master_consensus_state_map) : boost::none);
+ boost::optional<ConsensusStateMap>
+ (results.cluster_status.master_consensus_state_map) : boost::none);
CheckJsonVsServerHealthSummaries(
r,
"tserver_summaries",
sections & PrintSections::Values::TSERVER_SUMMARIES ?
- boost::optional<vector<KsckServerHealthSummary>>(results.tserver_summaries) : boost::none);
+ boost::optional<vector<ServerHealthSummary>>
+ (results.cluster_status.tserver_summaries) : boost::none);
CheckJsonVsVersionSummaries(
r,
"version_summaries",
sections & PrintSections::Values::VERSION_SUMMARIES ?
- boost::optional<KsckVersionToServersMap>(results.version_summaries) : boost::none);
+ boost::optional<KsckVersionToServersMap>
+ (results.version_summaries) : boost::none);
CheckJsonVsTabletSummaries(
r,
"tablet_summaries",
sections & PrintSections::Values::TABLET_SUMMARIES ?
- boost::optional<vector<KsckTabletSummary>>(results.tablet_summaries) : boost::none);
+ boost::optional<vector<TabletSummary>>
+ (results.cluster_status.tablet_summaries) : boost::none);
CheckJsonVsTableSummaries(
r,
"table_summaries",
sections & PrintSections::Values::TABLE_SUMMARIES ?
- boost::optional<vector<KsckTableSummary>>(results.table_summaries) : boost::none);
+ boost::optional<vector<TableSummary>>
+ (results.cluster_status.table_summaries) : boost::none);
CheckJsonVsChecksumResults(
r,
"checksum_results",
@@ -1066,7 +1084,7 @@ TEST_F(KsckTest, TestWrongUUIDTabletServer) {
static_pointer_cast<MockKsckTabletServer>(cluster_->tablet_servers_["ts-id-1"])
->fetch_info_status_ = error;
static_pointer_cast<MockKsckTabletServer>(cluster_->tablet_servers_["ts-id-1"])
- ->fetch_info_health_ = KsckServerHealth::WRONG_SERVER_UUID;
+ ->fetch_info_health_ = ServerHealth::WRONG_SERVER_UUID;
ASSERT_OK(ksck_->CheckClusterRunning());
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
@@ -1091,7 +1109,7 @@ TEST_F(KsckTest, TestBadTabletServer) {
static_pointer_cast<MockKsckTabletServer>(cluster_->tablet_servers_["ts-id-1"])
->fetch_info_status_ = error;
static_pointer_cast<MockKsckTabletServer>(cluster_->tablet_servers_["ts-id-1"])
- ->fetch_info_health_ = KsckServerHealth::UNAVAILABLE;
+ ->fetch_info_health_ = ServerHealth::UNAVAILABLE;
ASSERT_OK(ksck_->CheckClusterRunning());
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
@@ -1208,13 +1226,13 @@ TEST_F(KsckTest, TestOneSmallReplicatedTableWithConsensusState) {
CreateOneSmallReplicatedTable();
ASSERT_OK(RunKsck());
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 3,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 0,
- /*consensus_mismatch_tablets=*/ 0,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 3,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 0,
+ /*consensus_mismatch_tablets=*/ 0,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1244,13 +1262,13 @@ TEST_F(KsckTest, TestConsensusConflictExtraPeer) {
" B | A* B C | 0 | | Yes\n"
" C | A* B C | 0 | | Yes");
ASSERT_STR_CONTAINS(err_str,
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 2,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 0,
- /*consensus_mismatch_tablets=*/ 1,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 2,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 0,
+ /*consensus_mismatch_tablets=*/ 1,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1277,13 +1295,13 @@ TEST_F(KsckTest, TestConsensusConflictMissingPeer) {
" B | A* B C | 0 | | Yes\n"
" C | A* B C | 0 | | Yes");
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 2,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 0,
- /*consensus_mismatch_tablets=*/ 1,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 2,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 0,
+ /*consensus_mismatch_tablets=*/ 1,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1310,13 +1328,13 @@ TEST_F(KsckTest, TestConsensusConflictDifferentLeader) {
" B | A* B C | 0 | | Yes\n"
" C | A* B C | 0 | | Yes");
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 2,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 0,
- /*consensus_mismatch_tablets=*/ 1,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 2,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 0,
+ /*consensus_mismatch_tablets=*/ 1,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1332,13 +1350,13 @@ TEST_F(KsckTest, TestOneOneTabletBrokenTable) {
"Tablet tablet-id-1 of table 'test' is under-replicated: "
"configuration has 2 replicas vs desired 3");
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 0,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 1,
- /*consensus_mismatch_tablets=*/ 0,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 0,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 1,
+ /*consensus_mismatch_tablets=*/ 0,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1361,13 +1379,13 @@ TEST_F(KsckTest, TestMismatchedAssignments) {
" ts-id-1 (<mock>): RUNNING\n"
" ts-id-2 (<mock>): RUNNING\n");
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 2,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 1,
- /*consensus_mismatch_tablets=*/ 0,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 2,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 1,
+ /*consensus_mismatch_tablets=*/ 0,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1396,13 +1414,13 @@ TEST_F(KsckTest, TestTabletNotRunning) {
" Data state: TABLET_DATA_UNKNOWN\n"
" Last status: \n");
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 2,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 0,
- /*consensus_mismatch_tablets=*/ 0,
- /*unavailable_tablets=*/ 1));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 2,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 0,
+ /*consensus_mismatch_tablets=*/ 0,
+ /*unavailable_tablets=*/ 1));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1422,13 +1440,13 @@ TEST_F(KsckTest, TestTabletCopying) {
ASSERT_EQ("Corruption: table consistency check error: 1 out of 1 table(s) are not healthy",
error_messages[0].ToString());
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 2,
- /*recovering_tablets=*/ 1,
- /*underreplicated_tablets=*/ 0,
- /*consensus_mismatch_tablets=*/ 0,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 2,
+ /*recovering_tablets=*/ 1,
+ /*underreplicated_tablets=*/ 0,
+ /*consensus_mismatch_tablets=*/ 0,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1447,13 +1465,13 @@ TEST_F(KsckTest, TestMasterNotReportingTabletServer) {
ASSERT_EQ("Corruption: table consistency check error: 1 out of 1 table(s) are not healthy",
error_messages[0].ToString());
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 0,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 3,
- /*consensus_mismatch_tablets=*/ 0,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 0,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 3,
+ /*consensus_mismatch_tablets=*/ 0,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1486,13 +1504,13 @@ TEST_F(KsckTest, TestMasterNotReportingTabletServerWithConsensusConflict) {
" B | A B* C | 0 | | Yes\n"
" C | A* B C | 0 | | Yes");
ASSERT_STR_CONTAINS(err_stream_.str(),
- ExpectedKsckTableSummary("test",
- /*replication_factor=*/ 3,
- /*healthy_tablets=*/ 0,
- /*recovering_tablets=*/ 0,
- /*underreplicated_tablets=*/ 3,
- /*consensus_mismatch_tablets=*/ 0,
- /*unavailable_tablets=*/ 0));
+ ExpectedTableSummary("test",
+ /*replication_factor=*/ 3,
+ /*healthy_tablets=*/ 0,
+ /*recovering_tablets=*/ 0,
+ /*underreplicated_tablets=*/ 3,
+ /*consensus_mismatch_tablets=*/ 0,
+ /*unavailable_tablets=*/ 0));
CheckJsonStringVsKsckResults(KsckResultsToJsonString(), ksck_->results());
}
@@ -1572,7 +1590,7 @@ TEST_F(KsckTest, TestChecksumWithAllUnhealthyTabletServers) {
for (const auto& entry : cluster_->tablet_servers_) {
auto ts = static_pointer_cast<MockKsckTabletServer>(entry.second);
ts->fetch_info_status_ = Status::NetworkError("gremlins");
- ts->fetch_info_health_ = KsckServerHealth::UNAVAILABLE;
+ ts->fetch_info_health_ = ServerHealth::UNAVAILABLE;
}
// The checksum should short-circuit and fail because no tablet servers are
@@ -1590,7 +1608,7 @@ TEST_F(KsckTest, TestChecksumWithAllPeersUnhealthy) {
for (const auto& entry : cluster_->tablet_servers_) {
auto ts = static_pointer_cast<MockKsckTabletServer>(entry.second);
ts->fetch_info_status_ = Status::NetworkError("gremlins");
- ts->fetch_info_health_ = KsckServerHealth::UNAVAILABLE;
+ ts->fetch_info_health_ = ServerHealth::UNAVAILABLE;
}
const char* const new_uuid = "new";
EmplaceOrDie(&cluster_->tablet_servers_,
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 526e9ab..c66d9bd 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -70,6 +70,16 @@ DEFINE_string(ksck_format, "plain_concise",
DEFINE_bool(consensus, true,
"Whether to check the consensus state from each tablet against the master.");
+using kudu::cluster_summary::HealthCheckResult;
+using kudu::cluster_summary::ConsensusConfigType;
+using kudu::cluster_summary::ConsensusState;
+using kudu::cluster_summary::ConsensusStateMap;
+using kudu::cluster_summary::ReplicaSummary;
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::ServerHealthSummary;
+using kudu::cluster_summary::TableSummary;
+using kudu::cluster_summary::TabletSummary;
+
using std::atomic;
using std::cout;
using std::ostream;
@@ -83,13 +93,13 @@ namespace kudu {
namespace tools {
namespace {
-void BuildKsckConsensusStateForConfigMember(const consensus::ConsensusStatePB& cstate,
- KsckConsensusState* ksck_cstate) {
+void BuildConsensusStateForConfigMember(const consensus::ConsensusStatePB& cstate,
+ ConsensusState* ksck_cstate) {
CHECK(ksck_cstate);
ksck_cstate->term = cstate.current_term();
ksck_cstate->type = cstate.has_pending_config() ?
- KsckConsensusConfigType::PENDING :
- KsckConsensusConfigType::COMMITTED;
+ ConsensusConfigType::PENDING :
+ ConsensusConfigType::COMMITTED;
const auto& config = cstate.has_pending_config() ?
cstate.pending_config() :
cstate.committed_config();
@@ -167,12 +177,12 @@ Status Ksck::CheckMasterHealth() {
atomic<size_t> bad_masters(0);
atomic<size_t> unauthorized_masters(0);
- vector<KsckServerHealthSummary> master_summaries;
+ vector<ServerHealthSummary> master_summaries;
simple_spinlock master_summaries_lock;
for (const auto& master : cluster_->masters()) {
RETURN_NOT_OK(pool_->SubmitFunc([&]() {
- KsckServerHealthSummary sh;
+ ServerHealthSummary sh;
Status s = master->FetchInfo().AndThen([&]() {
return master->FetchConsensusState();
});
@@ -182,10 +192,10 @@ Status Ksck::CheckMasterHealth() {
sh.status = s;
if (!s.ok()) {
if (IsNotAuthorizedMethodAccess(s)) {
- sh.health = KsckServerHealth::UNAUTHORIZED;
+ sh.health = ServerHealth::UNAUTHORIZED;
++unauthorized_masters;
} else {
- sh.health = KsckServerHealth::UNAVAILABLE;
+ sh.health = ServerHealth::UNAVAILABLE;
}
++bad_masters;
}
@@ -209,7 +219,7 @@ Status Ksck::CheckMasterHealth() {
}
pool_->Wait();
- results_.master_summaries.swap(master_summaries);
+ results_.cluster_status.master_summaries.swap(master_summaries);
// Return a NotAuthorized status if any master has auth errors, since this
// indicates ksck may not be able to gather full and accurate info.
@@ -230,18 +240,18 @@ Status Ksck::CheckMasterHealth() {
Status Ksck::CheckMasterConsensus() {
// Reset this instance's view of master consensus conflict, in case this
// instance is being used to repeatedly check for master consensus conflict.
- results_.master_consensus_conflict = false;
+ results_.cluster_status.master_consensus_conflict = false;
if (!FLAGS_consensus) {
return Status::OK();
}
- KsckConsensusStateMap master_cstates;
+ ConsensusStateMap master_cstates;
for (const KsckCluster::MasterList::value_type& master : cluster_->masters()) {
if (master->cstate()) {
- KsckConsensusState ksck_cstate;
- BuildKsckConsensusStateForConfigMember(*master->cstate(), &ksck_cstate);
+ ConsensusState ksck_cstate;
+ BuildConsensusStateForConfigMember(*master->cstate(), &ksck_cstate);
InsertOrDie(&master_cstates, master->uuid(), ksck_cstate);
} else {
- results_.master_consensus_conflict = true;
+ results_.cluster_status.master_consensus_conflict = true;
}
}
if (master_cstates.empty()) {
@@ -249,21 +259,21 @@ Status Ksck::CheckMasterConsensus() {
}
// There's no "reference" cstate for masters, so pick an arbitrary master
// cstate to compare with.
- const KsckConsensusState& base = master_cstates.begin()->second;
+ const ConsensusState& base = master_cstates.begin()->second;
for (const auto& entry : master_cstates) {
if (!base.Matches(entry.second)) {
- results_.master_consensus_conflict = true;
+ results_.cluster_status.master_consensus_conflict = true;
break;
}
}
- results_.master_consensus_state_map.swap(master_cstates);
+ results_.cluster_status.master_consensus_state_map.swap(master_cstates);
vector<string> uuids;
std::transform(cluster_->masters().begin(), cluster_->masters().end(),
std::back_inserter(uuids),
[](const shared_ptr<KsckMaster>& master) { return master->uuid(); });
- results_.master_uuids.swap(uuids);
+ results_.cluster_status.master_uuids.swap(uuids);
- if (results_.master_consensus_conflict) {
+ if (results_.cluster_status.master_consensus_conflict) {
return Status::Corruption("there are master consensus conflicts");
}
return Status::OK();
@@ -332,21 +342,21 @@ Status Ksck::FetchInfoFromTabletServers() {
atomic<size_t> unauthorized_servers(0);
VLOG(1) << "Fetching info from all " << servers_count << " tablet servers";
- vector<KsckServerHealthSummary> tablet_server_summaries;
+ vector<ServerHealthSummary> tablet_server_summaries;
simple_spinlock tablet_server_summaries_lock;
for (const auto& entry : cluster_->tablet_servers()) {
const auto& ts = entry.second;
RETURN_NOT_OK(pool_->SubmitFunc([&]() {
VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
- KsckServerHealth health;
+ ServerHealth health;
Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
if (FLAGS_consensus) {
return ts->FetchConsensusState(&health);
}
return Status::OK();
});
- KsckServerHealthSummary summary;
+ ServerHealthSummary summary;
summary.uuid = ts->uuid();
summary.address = ts->address();
summary.ts_location = ts->location();
@@ -354,7 +364,7 @@ Status Ksck::FetchInfoFromTabletServers() {
summary.status = s;
if (!s.ok()) {
if (IsNotAuthorizedMethodAccess(s)) {
- health = KsckServerHealth::UNAUTHORIZED;
+ health = ServerHealth::UNAUTHORIZED;
++unauthorized_servers;
}
++bad_servers;
@@ -381,7 +391,7 @@ Status Ksck::FetchInfoFromTabletServers() {
}
pool_->Wait();
- results_.tserver_summaries.swap(tablet_server_summaries);
+ results_.cluster_status.tserver_summaries.swap(tablet_server_summaries);
// Return a NotAuthorized status if any tablet server has auth errors, since
// this indicates ksck may not be able to gather full and accurate info.
@@ -515,13 +525,13 @@ Status Ksck::CheckTabletServerUnusualFlags() {
Status Ksck::CheckServerVersions() {
results_.version_summaries.clear();
- for (const auto& s : results_.master_summaries) {
+ for (const auto& s : results_.cluster_status.master_summaries) {
if (!s.version) continue;
const auto& server = Substitute("master@$0", s.address);
auto& servers = LookupOrInsert(&results_.version_summaries, *s.version, {});
servers.push_back(server);
}
- for (const auto& s : results_.tserver_summaries) {
+ for (const auto& s : results_.cluster_status.tserver_summaries) {
if (!s.version) continue;
const auto& server = Substitute("tserver@$0", s.address);
auto& servers = LookupOrInsert(&results_.version_summaries, *s.version, {});
@@ -572,7 +582,7 @@ Status Ksck::CheckTablesConsistency() {
if (bad_tables_count > 0) {
return Status::Corruption(
Substitute("$0 out of $1 table(s) are not healthy",
- bad_tables_count, results_.table_summaries.size()));
+ bad_tables_count, results_.cluster_status.table_summaries.size()));
}
return Status::OK();
}
@@ -594,7 +604,7 @@ bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {
return true;
}
- KsckTableSummary ts;
+ TableSummary ts;
ts.id = table->id();
ts.name = table->name();
ts.replication_factor = table->num_replicas();
@@ -603,32 +613,32 @@ bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {
for (const auto& tablet : table->tablets()) {
auto tablet_result = VerifyTablet(tablet, table->num_replicas());
switch (tablet_result) {
- case KsckCheckResult::HEALTHY:
+ case HealthCheckResult::HEALTHY:
ts.healthy_tablets++;
break;
- case KsckCheckResult::RECOVERING:
+ case HealthCheckResult::RECOVERING:
ts.recovering_tablets++;
break;
- case KsckCheckResult::UNDER_REPLICATED:
+ case HealthCheckResult::UNDER_REPLICATED:
ts.underreplicated_tablets++;
break;
- case KsckCheckResult::CONSENSUS_MISMATCH:
+ case HealthCheckResult::CONSENSUS_MISMATCH:
ts.consensus_mismatch_tablets++;
break;
- case KsckCheckResult::UNAVAILABLE:
+ case HealthCheckResult::UNAVAILABLE:
ts.unavailable_tablets++;
break;
}
}
bool all_healthy = ts.healthy_tablets == ts.TotalTablets();
if (ts.TotalTablets() > 0) {
- results_.table_summaries.push_back(std::move(ts));
+ results_.cluster_status.table_summaries.push_back(std::move(ts));
}
return all_healthy;
}
-KsckCheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet,
- int table_num_replicas) {
+HealthCheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet,
+ int table_num_replicas) {
const string tablet_str = Substitute("Tablet $0 of table '$1'",
tablet->id(), tablet->table()->name());
@@ -647,19 +657,19 @@ KsckCheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet,
non_voter_uuids_from_master.push_back(replica->ts_uuid());
}
}
- KsckConsensusState master_config(KsckConsensusConfigType::MASTER,
- boost::none,
- boost::none,
- leader_uuid,
- voter_uuids_from_master,
- non_voter_uuids_from_master);
+ ConsensusState master_config(ConsensusConfigType::MASTER,
+ boost::none,
+ boost::none,
+ leader_uuid,
+ voter_uuids_from_master,
+ non_voter_uuids_from_master);
int leaders_count = 0;
int running_voters_count = 0;
int copying_replicas_count = 0;
int conflicting_states = 0;
int num_voters = 0;
- vector<KsckReplicaSummary> replicas;
+ vector<ReplicaSummary> replicas;
for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas()) {
replicas.emplace_back();
auto* repl_info = &replicas.back();
@@ -684,8 +694,8 @@ KsckCheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet,
std::pair<string, string> tablet_key = std::make_pair(ts->uuid(), tablet->id());
if (ContainsKey(ts->tablet_consensus_state_map(), tablet_key)) {
const auto& cstate = FindOrDieNoPrint(ts->tablet_consensus_state_map(), tablet_key);
- KsckConsensusState ksck_cstate;
- BuildKsckConsensusStateForConfigMember(cstate, &ksck_cstate);
+ ConsensusState ksck_cstate;
+ BuildConsensusStateForConfigMember(cstate, &ksck_cstate);
repl_info->consensus_state = std::move(ksck_cstate);
}
}
@@ -711,40 +721,40 @@ KsckCheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet,
}
// Determine the overall health state of the tablet.
- KsckCheckResult result = KsckCheckResult::HEALTHY;
+ HealthCheckResult result = HealthCheckResult::HEALTHY;
string status;
int majority_size = consensus::MajoritySize(num_voters);
if (copying_replicas_count > 0) {
- result = KsckCheckResult::RECOVERING;
+ result = HealthCheckResult::RECOVERING;
status = Substitute("$0 is $1: $2 on-going tablet copies",
tablet_str,
Color(AnsiCode::YELLOW, "recovering"),
copying_replicas_count);
} else if (running_voters_count < majority_size) {
- result = KsckCheckResult::UNAVAILABLE;
+ result = HealthCheckResult::UNAVAILABLE;
status = Substitute("$0 is $1: $2 replica(s) not RUNNING",
tablet_str,
Color(AnsiCode::RED, "unavailable"),
num_voters - running_voters_count);
} else if (running_voters_count < num_voters) {
- result = KsckCheckResult::UNDER_REPLICATED;
+ result = HealthCheckResult::UNDER_REPLICATED;
status = Substitute("$0 is $1: $2 replica(s) not RUNNING",
tablet_str,
Color(AnsiCode::YELLOW, "under-replicated"),
num_voters - running_voters_count);
} else if (check_replica_count_ && num_voters < table_num_replicas) {
- result = KsckCheckResult::UNDER_REPLICATED;
+ result = HealthCheckResult::UNDER_REPLICATED;
status = Substitute("$0 is $1: configuration has $2 replicas vs desired $3",
tablet_str,
Color(AnsiCode::YELLOW, "under-replicated"),
num_voters,
table_num_replicas);
} else if (leaders_count != 1) {
- result = KsckCheckResult::UNAVAILABLE;
+ result = HealthCheckResult::UNAVAILABLE;
status = Substitute("$0 is $1: expected one LEADER replica",
tablet_str, Color(AnsiCode::RED, "unavailable"));
} else if (conflicting_states > 0) {
- result = KsckCheckResult::CONSENSUS_MISMATCH;
+ result = HealthCheckResult::CONSENSUS_MISMATCH;
status = Substitute("$0 is $1: $2 replicas' active configs disagree with the "
"leader master's",
tablet_str,
@@ -756,7 +766,7 @@ KsckCheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet,
Color(AnsiCode::GREEN, "healthy"));
}
- KsckTabletSummary tablet_summary;
+ TabletSummary tablet_summary;
tablet_summary.id = tablet->id();
tablet_summary.table_id = tablet->table()->id();
tablet_summary.table_name = tablet->table()->name();
@@ -764,7 +774,7 @@ KsckCheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet,
tablet_summary.status = status;
tablet_summary.master_cstate = std::move(master_config);
tablet_summary.replicas.swap(replicas);
- results_.tablet_summaries.push_back(std::move(tablet_summary));
+ results_.cluster_status.tablet_summaries.push_back(std::move(tablet_summary));
return result;
}
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 2a07b46..4412b6e 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -39,6 +39,7 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rebalance/cluster_status.h" // IWYU pragma: keep
#include "kudu/server/server_base.pb.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h" // IWYU pragma: keep
@@ -290,14 +291,14 @@ class KsckTabletServer {
// If Status is OK, 'health' will be HEALTHY
// If the UUID is not what ksck expects, 'health' will be WRONG_SERVER_UUID
// Otherwise 'health' will be UNAVAILABLE
- virtual Status FetchInfo(KsckServerHealth* health) = 0;
+ virtual Status FetchInfo(cluster_summary::ServerHealth* health) = 0;
// Connects to the configured tablet server and populates the consensus map. 'health' must not be
// nullptr.
//
// If Status is OK, 'health' will be HEALTHY
// Otherwise 'health' will be UNAVAILABLE
- virtual Status FetchConsensusState(KsckServerHealth* health) = 0;
+ virtual Status FetchConsensusState(cluster_summary::ServerHealth* health) = 0;
// Retrieves "unusual" flags from the KsckTabletServer.
// "Unusual" flags ares ones tagged hidden, experimental, or unsafe.
@@ -608,8 +609,9 @@ class Ksck {
const MonoDelta& timeout,
const MonoDelta& retry_interval);
- KsckCheckResult VerifyTablet(const std::shared_ptr<KsckTablet>& tablet,
- int table_num_replicas);
+ cluster_summary::HealthCheckResult VerifyTablet(
+ const std::shared_ptr<KsckTablet>& tablet,
+ int table_num_replicas);
const std::shared_ptr<KsckCluster> cluster_;
gscoped_ptr<ThreadPool> pool_;
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 3338c77..b5b2912 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -48,6 +48,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.h"
#include "kudu/master/sys_catalog.h"
+#include "kudu/rebalance/cluster_status.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/response_callback.h"
#include "kudu/rpc/rpc_controller.h"
@@ -56,7 +57,6 @@
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/ksck.h"
#include "kudu/tools/ksck_checksum.h"
-#include "kudu/tools/ksck_results.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tserver.pb.h"
@@ -81,6 +81,7 @@ using kudu::client::KuduSchema;
using kudu::client::KuduTable;
using kudu::client::KuduTabletServer;
using kudu::client::internal::ReplicaController;
+using kudu::cluster_summary::ServerHealth;
using kudu::rpc::Messenger;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::RpcController;
@@ -187,10 +188,10 @@ Status RemoteKsckTabletServer::Init() {
return Status::OK();
}
-Status RemoteKsckTabletServer::FetchInfo(KsckServerHealth* health) {
+Status RemoteKsckTabletServer::FetchInfo(ServerHealth* health) {
DCHECK(health);
state_ = KsckFetchState::FETCH_FAILED;
- *health = KsckServerHealth::UNAVAILABLE;
+ *health = ServerHealth::UNAVAILABLE;
{
server::GetStatusRequestPB req;
server::GetStatusResponsePB resp;
@@ -201,7 +202,7 @@ Status RemoteKsckTabletServer::FetchInfo(KsckServerHealth* health) {
version_ = resp.status().version_info().version_string();
string response_uuid = resp.status().node_instance().permanent_uuid();
if (response_uuid != uuid()) {
- *health = KsckServerHealth::WRONG_SERVER_UUID;
+ *health = ServerHealth::WRONG_SERVER_UUID;
return Status::RemoteError(Substitute("ID reported by tablet server ($0) doesn't "
"match the expected ID: $1",
response_uuid, uuid()));
@@ -228,7 +229,7 @@ Status RemoteKsckTabletServer::FetchInfo(KsckServerHealth* health) {
RETURN_NOT_OK(FetchCurrentTimestamp());
state_ = KsckFetchState::FETCHED;
- *health = KsckServerHealth::HEALTHY;
+ *health = ServerHealth::HEALTHY;
return Status::OK();
}
@@ -262,9 +263,9 @@ Status RemoteKsckTabletServer::FetchCurrentTimestamp() {
return Status::OK();
}
-Status RemoteKsckTabletServer::FetchConsensusState(KsckServerHealth* health) {
+Status RemoteKsckTabletServer::FetchConsensusState(ServerHealth* health) {
DCHECK(health);
- *health = KsckServerHealth::UNAVAILABLE;
+ *health = ServerHealth::UNAVAILABLE;
tablet_consensus_state_map_.clear();
consensus::GetConsensusStateRequestPB req;
consensus::GetConsensusStateResponsePB resp;
@@ -283,7 +284,7 @@ Status RemoteKsckTabletServer::FetchConsensusState(KsckServerHealth* health) {
}
}
- *health = KsckServerHealth::HEALTHY;
+ *health = ServerHealth::HEALTHY;
return Status::OK();
}
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 9e348ac..9980871 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -38,6 +38,10 @@ namespace client {
class KuduClient;
}
+namespace cluster_summary {
+enum class ServerHealth;
+}
+
namespace consensus {
class ConsensusServiceProxy;
}
@@ -57,8 +61,6 @@ class TabletServerServiceProxy;
namespace tools {
class KsckChecksumManager;
-
-enum class KsckServerHealth;
struct KsckChecksumOptions;
// This implementation connects to a master via RPC.
@@ -104,9 +106,9 @@ class RemoteKsckTabletServer : public KsckTabletServer,
// Must be called after constructing.
Status Init();
- Status FetchInfo(KsckServerHealth* health) override;
+ Status FetchInfo(cluster_summary::ServerHealth* health) override;
- Status FetchConsensusState(KsckServerHealth* health) override;
+ Status FetchConsensusState(cluster_summary::ServerHealth* health) override;
Status FetchUnusualFlags() override;
diff --git a/src/kudu/tools/ksck_results.cc b/src/kudu/tools/ksck_results.cc
index c26d1f9..1ceea49 100644
--- a/src/kudu/tools/ksck_results.cc
+++ b/src/kudu/tools/ksck_results.cc
@@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include <numeric>
+#include <set>
#include <string>
#include <tuple>
#include <type_traits>
@@ -36,6 +37,8 @@
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/tablet/metadata.pb.h"
#include "kudu/tools/color.h"
#include "kudu/tools/tool.pb.h"
#include "kudu/tools/tool_action_common.h"
@@ -48,6 +51,20 @@ DEFINE_uint32(truncate_server_csv_length, 3,
"see more servers with, e.g., unusual flags, at the cost of "
"output with long lines.");
+using kudu::cluster_summary::HealthCheckResult;
+using kudu::cluster_summary::HealthCheckResultToString;
+using kudu::cluster_summary::ConsensusConfigType;
+using kudu::cluster_summary::ConsensusState;
+using kudu::cluster_summary::ConsensusStateMap;
+using kudu::cluster_summary::ReplicaSummary;
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::ServerHealthSummary;
+using kudu::cluster_summary::ServerType;
+using kudu::cluster_summary::TableSummary;
+using kudu::cluster_summary::TabletSummary;
+using kudu::cluster_summary::ServerHealthToString;
+using kudu::cluster_summary::ServerTypeToString;
+
using std::endl;
using std::left;
using std::map;
@@ -67,7 +84,8 @@ namespace tools {
namespace {
// Return a formatted string version of 'config', mapping UUIDs to single-character
// labels using the mapping 'label_by_uuid'.
-string format_replicas(const map<string, char>& label_by_uuid, const KsckConsensusState& config) {
+string format_replicas(const map<string, char>& label_by_uuid,
+ const ConsensusState& config) {
constexpr int kPeerWidth = 4;
ostringstream result;
// Sort the output by label for readability.
@@ -107,15 +125,15 @@ void AddToUuidLabelMapping(const std::set<string>& uuids,
}
void AddRowForCState(const string& label,
- const KsckConsensusState& cstate,
- const map<string, char> replica_labels,
+ const ConsensusState& cstate,
+ const map<string, char>& replica_labels,
DataTable* table) {
const string replicas = format_replicas(replica_labels, cstate);
const string opid_index_str = cstate.opid_index ?
std::to_string(*cstate.opid_index) :
"";
const string term = cstate.term ? std::to_string(*cstate.term) : "";
- const string committed = cstate.type == KsckConsensusConfigType::PENDING ?
+ const string committed = cstate.type == ConsensusConfigType::PENDING ?
"No" : "Yes";
table->AddRow({ label, replicas, term, opid_index_str, committed });
}
@@ -123,10 +141,10 @@ void AddRowForCState(const string& label,
// Sort servers by health, from most healthy to least.
// Useful for making unhealthy servers appear at the bottom of output when
// printing out a collection of server health summaries.
-bool ServerByHealthComparator(const KsckServerHealthSummary& left,
- const KsckServerHealthSummary& right) {
- return std::make_tuple(ServerHealthScore(left.health), left.uuid, left.address) <
- std::make_tuple(ServerHealthScore(right.health), right.uuid, right.address);
+bool ServerByHealthComparator(const ServerHealthSummary& left,
+ const ServerHealthSummary& right) {
+ return std::make_tuple(left.health, left.uuid, left.address) <
+ std::make_tuple(right.health, right.uuid, right.address);
}
// Produces a possibly-abbreviated CSV of 'servers':
@@ -153,64 +171,6 @@ string ServerCsv(int server_count, const vector<string>& servers) {
} // anonymous namespace
-const char* const KsckCheckResultToString(KsckCheckResult cr) {
- switch (cr) {
- case KsckCheckResult::HEALTHY:
- return "HEALTHY";
- case KsckCheckResult::RECOVERING:
- return "RECOVERING";
- case KsckCheckResult::UNDER_REPLICATED:
- return "UNDER_REPLICATED";
- case KsckCheckResult::CONSENSUS_MISMATCH:
- return "CONSENSUS_MISMATCH";
- case KsckCheckResult::UNAVAILABLE:
- return "UNAVAILABLE";
- default:
- LOG(FATAL) << "Unknown CheckResult";
- }
-}
-
-const char* const ServerTypeToString(KsckServerType type) {
- switch (type) {
- case KsckServerType::MASTER:
- return "Master";
- case KsckServerType::TABLET_SERVER:
- return "Tablet Server";
- default:
- LOG(FATAL) << "Unknown ServerType";
- }
-}
-
-const char* const ServerHealthToString(KsckServerHealth sh) {
- switch (sh) {
- case KsckServerHealth::HEALTHY:
- return "HEALTHY";
- case KsckServerHealth::UNAUTHORIZED:
- return "UNAUTHORIZED";
- case KsckServerHealth::UNAVAILABLE:
- return "UNAVAILABLE";
- case KsckServerHealth::WRONG_SERVER_UUID:
- return "WRONG_SERVER_UUID";
- default:
- LOG(FATAL) << "Unknown KsckServerHealth";
- }
-}
-
-int ServerHealthScore(KsckServerHealth sh) {
- switch (sh) {
- case KsckServerHealth::HEALTHY:
- return 0;
- case KsckServerHealth::UNAUTHORIZED:
- return 1;
- case KsckServerHealth::UNAVAILABLE:
- return 2;
- case KsckServerHealth::WRONG_SERVER_UUID:
- return 3;
- default:
- LOG(FATAL) << "Unknown KsckServerHealth";
- }
-}
-
Status KsckResults::PrintTo(PrintMode mode, int sections, ostream& out) {
if (mode == PrintMode::JSON_PRETTY || mode == PrintMode::JSON_COMPACT) {
return PrintJsonTo(mode, sections, out);
@@ -218,20 +178,22 @@ Status KsckResults::PrintTo(PrintMode mode, int sections, ostream& out) {
// First, report on the masters and master tablet.
if (sections & PrintSections::MASTER_SUMMARIES) {
- std::sort(master_summaries.begin(), master_summaries.end(), ServerByHealthComparator);
- RETURN_NOT_OK(PrintServerHealthSummaries(KsckServerType::MASTER,
- master_summaries,
+ std::sort(cluster_status.master_summaries.begin(),
+ cluster_status.master_summaries.end(),
+ ServerByHealthComparator);
+ RETURN_NOT_OK(PrintServerHealthSummaries(ServerType::MASTER,
+ cluster_status.master_summaries,
out));
- if (mode == PrintMode::PLAIN_FULL || master_consensus_conflict) {
- RETURN_NOT_OK(PrintConsensusMatrix(master_uuids,
+ if (mode == PrintMode::PLAIN_FULL || cluster_status.master_consensus_conflict) {
+ RETURN_NOT_OK(PrintConsensusMatrix(cluster_status.master_uuids,
boost::none,
- master_consensus_state_map,
+ cluster_status.master_consensus_state_map,
out));
}
out << endl;
- RETURN_NOT_OK(PrintFlagTable(KsckServerType::MASTER,
- master_summaries.size(),
+ RETURN_NOT_OK(PrintFlagTable(ServerType::MASTER,
+ cluster_status.master_summaries.size(),
master_flag_to_servers_map,
master_flag_tags_map,
out));
@@ -242,16 +204,18 @@ Status KsckResults::PrintTo(PrintMode mode, int sections, ostream& out) {
// Then, on the health of the tablet servers.
if (sections & PrintSections::TSERVER_SUMMARIES) {
- std::sort(tserver_summaries.begin(), tserver_summaries.end(), ServerByHealthComparator);
- RETURN_NOT_OK(PrintServerHealthSummaries(KsckServerType::TABLET_SERVER,
- tserver_summaries,
+ std::sort(cluster_status.tserver_summaries.begin(),
+ cluster_status.tserver_summaries.end(),
+ ServerByHealthComparator);
+ RETURN_NOT_OK(PrintServerHealthSummaries(ServerType::TABLET_SERVER,
+ cluster_status.tserver_summaries,
out));
- if (!tserver_summaries.empty()) {
+ if (!cluster_status.tserver_summaries.empty()) {
out << endl;
}
- RETURN_NOT_OK(PrintFlagTable(KsckServerType::TABLET_SERVER,
- tserver_summaries.size(),
+ RETURN_NOT_OK(PrintFlagTable(ServerType::TABLET_SERVER,
+ cluster_status.tserver_summaries.size(),
tserver_flag_to_servers_map,
tserver_flag_tags_map,
out));
@@ -263,30 +227,31 @@ Status KsckResults::PrintTo(PrintMode mode, int sections, ostream& out) {
// Finally, in the "server section", print the version summary.
if (sections & PrintSections::VERSION_SUMMARIES) {
RETURN_NOT_OK(PrintVersionTable(version_summaries,
- master_summaries.size() + tserver_summaries.size(),
+ cluster_status.master_summaries.size()
+ + cluster_status.tserver_summaries.size(),
out));
out << endl;
}
// Then, on each tablet.
if (sections & PrintSections::TABLET_SUMMARIES) {
- RETURN_NOT_OK(PrintTabletSummaries(tablet_summaries, mode, out));
+ RETURN_NOT_OK(PrintTabletSummaries(cluster_status.tablet_summaries, mode, out));
}
// Then, summarize the tablets by table.
// Sort the tables so unhealthy tables are easy to see at the bottom.
if (sections & PrintSections::TABLE_SUMMARIES) {
- std::sort(table_summaries.begin(),
- table_summaries.end(),
- [](const KsckTableSummary &left,
- const KsckTableSummary &right) {
- return std::make_pair(left.TableStatus() != KsckCheckResult::HEALTHY,
+ std::sort(cluster_status.table_summaries.begin(),
+ cluster_status.table_summaries.end(),
+ [](const TableSummary &left,
+ const TableSummary &right) {
+ return std::make_pair(left.TableStatus() != HealthCheckResult::HEALTHY,
left.name) <
- std::make_pair(right.TableStatus() != KsckCheckResult::HEALTHY,
+ std::make_pair(right.TableStatus() != HealthCheckResult::HEALTHY,
right.name);
});
- RETURN_NOT_OK(PrintTableSummaries(table_summaries, out));
- if (!table_summaries.empty()) {
+ RETURN_NOT_OK(PrintTableSummaries(cluster_status.table_summaries, out));
+ if (!cluster_status.table_summaries.empty()) {
out << endl;
}
@@ -341,8 +306,8 @@ Status KsckResults::PrintTo(PrintMode mode, int sections, ostream& out) {
}
Status PrintConsensusMatrix(const vector<string>& server_uuids,
- const boost::optional<KsckConsensusState> ref_cstate,
- const KsckConsensusStateMap& cstates,
+ const boost::optional<ConsensusState>& ref_cstate,
+ const ConsensusStateMap& cstates,
ostream& out) {
map<string, char> replica_labels;
for (const auto& uuid : server_uuids) {
@@ -379,13 +344,13 @@ Status PrintConsensusMatrix(const vector<string>& server_uuids,
return cmatrix.PrintTo(out);
}
-Status PrintServerHealthSummaries(KsckServerType type,
- const vector<KsckServerHealthSummary>& summaries,
+Status PrintServerHealthSummaries(ServerType type,
+ const vector<ServerHealthSummary>& summaries,
ostream& out) {
out << ServerTypeToString(type) << " Summary" << endl;
if (summaries.empty()) return Status::OK();
- if (type == KsckServerType::TABLET_SERVER) {
+ if (type == ServerType::TABLET_SERVER) {
DataTable table({ "UUID", "Address", "Status", "Location" });
unordered_map<string, int> location_counts;
for (const auto& s : summaries) {
@@ -404,7 +369,7 @@ Status PrintServerHealthSummaries(KsckServerType type,
}
RETURN_NOT_OK(loc_stats_table.PrintTo(out));
} else {
- DCHECK_EQ(ServerTypeToString(type), ServerTypeToString(KsckServerType::MASTER));
+ DCHECK_EQ(ServerTypeToString(type), ServerTypeToString(ServerType::MASTER));
DataTable table({ "UUID", "Address", "Status" });
for (const auto& s : summaries) {
table.AddRow({ s.uuid, s.address, ServerHealthToString(s.health) });
@@ -415,14 +380,14 @@ Status PrintServerHealthSummaries(KsckServerType type,
// Print out the status message from each server with bad health.
// This isn't done as part of the table because the messages can be quite long.
for (const auto& s : summaries) {
- if (s.health == KsckServerHealth::HEALTHY) continue;
+ if (s.health == ServerHealth::HEALTHY) continue;
out << Substitute("Error from $0: $1 ($2)", s.address, s.status.ToString(),
ServerHealthToString(s.health)) << endl;
}
return Status::OK();
}
-Status PrintFlagTable(KsckServerType type,
+Status PrintFlagTable(ServerType type,
int num_servers,
const KsckFlagToServersMap& flag_to_servers_map,
const KsckFlagTagsMap& flag_tags_map,
@@ -453,7 +418,7 @@ Status PrintVersionTable(const KsckVersionToServersMap& version_summaries,
return table.PrintTo(out);
}
-Status PrintTableSummaries(const vector<KsckTableSummary>& table_summaries,
+Status PrintTableSummaries(const vector<TableSummary>& table_summaries,
ostream& out) {
if (table_summaries.empty()) {
out << "The cluster doesn't have any matching tables" << endl;
@@ -463,9 +428,10 @@ Status PrintTableSummaries(const vector<KsckTableSummary>& table_summaries,
out << "Summary by table" << endl;
DataTable table({ "Name", "RF", "Status", "Total Tablets",
"Healthy", "Recovering", "Under-replicated", "Unavailable"});
- for (const KsckTableSummary& ts : table_summaries) {
+ for (const TableSummary& ts : table_summaries) {
table.AddRow({ ts.name,
- to_string(ts.replication_factor), KsckCheckResultToString(ts.TableStatus()),
+ to_string(ts.replication_factor),
+ HealthCheckResultToString(ts.TableStatus()),
to_string(ts.TotalTablets()),
to_string(ts.healthy_tablets), to_string(ts.recovering_tablets),
to_string(ts.underreplicated_tablets),
@@ -474,7 +440,7 @@ Status PrintTableSummaries(const vector<KsckTableSummary>& table_summaries,
return table.PrintTo(out);
}
-Status PrintTabletSummaries(const vector<KsckTabletSummary>& tablet_summaries,
+Status PrintTabletSummaries(const vector<TabletSummary>& tablet_summaries,
PrintMode mode,
ostream& out) {
if (tablet_summaries.empty()) {
@@ -484,11 +450,11 @@ Status PrintTabletSummaries(const vector<KsckTabletSummary>& tablet_summaries,
out << "Tablet Summary" << endl;
for (const auto& tablet_summary : tablet_summaries) {
if (mode != PrintMode::PLAIN_FULL &&
- tablet_summary.result == KsckCheckResult::HEALTHY) {
+ tablet_summary.result == HealthCheckResult::HEALTHY) {
continue;
}
out << tablet_summary.status << endl;
- for (const KsckReplicaSummary& r : tablet_summary.replicas) {
+ for (const ReplicaSummary& r : tablet_summary.replicas) {
const char* spec_str = r.is_leader
? " [LEADER]" : (!r.is_voter ? " [NONVOTER]" : "");
@@ -522,16 +488,17 @@ Status PrintTabletSummaries(const vector<KsckTabletSummary>& tablet_summaries,
auto& master_cstate = tablet_summary.master_cstate;
vector<string> ts_uuids;
- for (const KsckReplicaSummary& rs : tablet_summary.replicas) {
+ for (const ReplicaSummary& rs : tablet_summary.replicas) {
ts_uuids.push_back(rs.ts_uuid);
}
- KsckConsensusStateMap consensus_state_map;
- for (const KsckReplicaSummary& rs : tablet_summary.replicas) {
+ ConsensusStateMap consensus_state_map;
+ for (const ReplicaSummary& rs : tablet_summary.replicas) {
if (rs.consensus_state) {
InsertOrDie(&consensus_state_map, rs.ts_uuid, *rs.consensus_state);
}
}
- RETURN_NOT_OK(PrintConsensusMatrix(ts_uuids, master_cstate, consensus_state_map, out));
+ RETURN_NOT_OK(PrintConsensusMatrix(ts_uuids, master_cstate,
+ consensus_state_map, out));
out << endl;
}
return Status::OK();
@@ -545,12 +512,12 @@ Status PrintReplicaCountByTserverSummary(PrintMode mode,
// Collate the counts by tablet servers. We populate the map with the UUIDs
// from 'tserver_summaries' so we don't miss empty tablet servers.
map<string, std::pair<string, int>> host_and_replica_count_by_tserver;
- for (const auto& tserver : results.tserver_summaries) {
+ for (const auto& tserver : results.cluster_status.tserver_summaries) {
EmplaceIfNotPresent(&host_and_replica_count_by_tserver,
tserver.uuid,
std::make_pair(tserver.address, 0));
}
- for (const auto& tablet : results.tablet_summaries) {
+ for (const auto& tablet : results.cluster_status.tablet_summaries) {
for (const auto& replica : tablet.replicas) {
const string address = replica.ts_address ? *replica.ts_address :
"unavailable";
@@ -695,42 +662,42 @@ Status PrintChecksumResults(const KsckChecksumResults& checksum_results,
Status PrintTotalCounts(const KsckResults& results, std::ostream& out) {
// Don't print the results if there's no matching tables.
- if (results.table_summaries.empty()) {
+ if (results.cluster_status.table_summaries.empty()) {
return Status::OK();
}
- int num_replicas = std::accumulate(results.tablet_summaries.begin(),
- results.tablet_summaries.end(),
+ int num_replicas = std::accumulate(results.cluster_status.tablet_summaries.begin(),
+ results.cluster_status.tablet_summaries.end(),
0,
- [](int acc, const KsckTabletSummary& ts) {
+ [](int acc, const TabletSummary& ts) {
return acc + ts.replicas.size();
});
out << "Total Count Summary" << endl;
DataTable totals({ "", "Total Count" });
- totals.AddRow({ "Masters", to_string(results.master_summaries.size()) });
- totals.AddRow({ "Tablet Servers", to_string(results.tserver_summaries.size()) });
- totals.AddRow({ "Tables", to_string(results.table_summaries.size()) });
- totals.AddRow({ "Tablets", to_string(results.tablet_summaries.size()) });
+ totals.AddRow({ "Masters", to_string(results.cluster_status.master_summaries.size()) });
+ totals.AddRow({ "Tablet Servers", to_string(results.cluster_status.tserver_summaries.size()) });
+ totals.AddRow({ "Tables", to_string(results.cluster_status.table_summaries.size()) });
+ totals.AddRow({ "Tablets", to_string(results.cluster_status.tablet_summaries.size()) });
totals.AddRow({ "Replicas", to_string(num_replicas) });
return totals.PrintTo(out);
}
-void KsckServerHealthSummaryToPb(const KsckServerHealthSummary& summary,
- KsckServerHealthSummaryPB* pb) {
+void ServerHealthSummaryToPb(const ServerHealthSummary& summary,
+ ServerHealthSummaryPB* pb) {
switch (summary.health) {
- case KsckServerHealth::HEALTHY:
- pb->set_health(KsckServerHealthSummaryPB_ServerHealth_HEALTHY);
+ case ServerHealth::HEALTHY:
+ pb->set_health(ServerHealthSummaryPB_ServerHealth_HEALTHY);
break;
- case KsckServerHealth::UNAUTHORIZED:
- pb->set_health(KsckServerHealthSummaryPB_ServerHealth_UNAUTHORIZED);
+ case ServerHealth::UNAUTHORIZED:
+ pb->set_health(ServerHealthSummaryPB_ServerHealth_UNAUTHORIZED);
break;
- case KsckServerHealth::UNAVAILABLE:
- pb->set_health(KsckServerHealthSummaryPB_ServerHealth_UNAVAILABLE);
+ case ServerHealth::UNAVAILABLE:
+ pb->set_health(ServerHealthSummaryPB_ServerHealth_UNAVAILABLE);
break;
- case KsckServerHealth::WRONG_SERVER_UUID:
- pb->set_health(KsckServerHealthSummaryPB_ServerHealth_WRONG_SERVER_UUID);
+ case ServerHealth::WRONG_SERVER_UUID:
+ pb->set_health(ServerHealthSummaryPB_ServerHealth_WRONG_SERVER_UUID);
break;
default:
- pb->set_health(KsckServerHealthSummaryPB_ServerHealth_UNKNOWN);
+ pb->set_health(ServerHealthSummaryPB_ServerHealth_UNKNOWN);
}
pb->set_uuid(summary.uuid);
pb->set_address(summary.address);
@@ -743,20 +710,20 @@ void KsckServerHealthSummaryToPb(const KsckServerHealthSummary& summary,
}
}
-void KsckConsensusStateToPb(const KsckConsensusState& cstate,
- KsckConsensusStatePB* pb) {
+void ConsensusStateToPb(const ConsensusState& cstate,
+ ConsensusStatePB* pb) {
switch (cstate.type) {
- case KsckConsensusConfigType::MASTER:
- pb->set_type(KsckConsensusStatePB_ConfigType_MASTER);
+ case ConsensusConfigType::MASTER:
+ pb->set_type(ConsensusStatePB_ConfigType_MASTER);
break;
- case KsckConsensusConfigType::COMMITTED:
- pb->set_type(KsckConsensusStatePB_ConfigType_COMMITTED);
+ case ConsensusConfigType::COMMITTED:
+ pb->set_type(ConsensusStatePB_ConfigType_COMMITTED);
break;
- case KsckConsensusConfigType::PENDING:
- pb->set_type(KsckConsensusStatePB_ConfigType_PENDING);
+ case ConsensusConfigType::PENDING:
+ pb->set_type(ConsensusStatePB_ConfigType_PENDING);
break;
default:
- pb->set_type(KsckConsensusStatePB_ConfigType_UNKNOWN);
+ pb->set_type(ConsensusStatePB_ConfigType_UNKNOWN);
}
if (cstate.term) {
pb->set_term(*cstate.term);
@@ -775,8 +742,8 @@ void KsckConsensusStateToPb(const KsckConsensusState& cstate,
}
}
-void KsckReplicaSummaryToPb(const KsckReplicaSummary& replica,
- KsckReplicaSummaryPB* pb) {
+void ReplicaSummaryToPb(const ReplicaSummary& replica,
+ ReplicaSummaryPB* pb) {
pb->set_ts_uuid(replica.ts_uuid);
if (replica.ts_address) {
pb->set_ts_address(*replica.ts_address);
@@ -789,41 +756,40 @@ void KsckReplicaSummaryToPb(const KsckReplicaSummary& replica,
pb->mutable_status_pb()->CopyFrom(*replica.status_pb);
}
if (replica.consensus_state) {
- KsckConsensusStateToPb(*replica.consensus_state, pb->mutable_consensus_state());
+ ConsensusStateToPb(*replica.consensus_state, pb->mutable_consensus_state());
}
}
-KsckTabletHealthPB KsckTabletHealthToPB(KsckCheckResult c) {
+KsckTabletHealthPB KsckTabletHealthToPB(HealthCheckResult c) {
switch (c) {
- case KsckCheckResult::HEALTHY:
+ case HealthCheckResult::HEALTHY:
return KsckTabletHealthPB::HEALTHY;
- case KsckCheckResult::RECOVERING:
+ case HealthCheckResult::RECOVERING:
return KsckTabletHealthPB::RECOVERING;
- case KsckCheckResult::UNDER_REPLICATED:
+ case HealthCheckResult::UNDER_REPLICATED:
return KsckTabletHealthPB::UNDER_REPLICATED;
- case KsckCheckResult::UNAVAILABLE:
+ case HealthCheckResult::UNAVAILABLE:
return KsckTabletHealthPB::UNAVAILABLE;
- case KsckCheckResult::CONSENSUS_MISMATCH:
+ case HealthCheckResult::CONSENSUS_MISMATCH:
return KsckTabletHealthPB::CONSENSUS_MISMATCH;
default:
return KsckTabletHealthPB::UNKNOWN;
}
}
-void KsckTabletSummaryToPb(const KsckTabletSummary& tablet,
- KsckTabletSummaryPB* pb) {
+void TabletSummaryToPb(const TabletSummary& tablet, TabletSummaryPB* pb) {
pb->set_id(tablet.id);
pb->set_table_id(tablet.table_id);
pb->set_table_name(tablet.table_name);
pb->set_health(KsckTabletHealthToPB(tablet.result));
pb->set_status(tablet.status);
- KsckConsensusStateToPb(tablet.master_cstate, pb->mutable_master_cstate());
+ ConsensusStateToPb(tablet.master_cstate, pb->mutable_master_cstate());
for (const auto& replica : tablet.replicas) {
- KsckReplicaSummaryToPb(replica, pb->add_replicas());
+ ReplicaSummaryToPb(replica, pb->add_replicas());
}
}
-void KsckTableSummaryToPb(const KsckTableSummary& table, KsckTableSummaryPB* pb) {
+void TableSummaryToPb(const TableSummary& table, TableSummaryPB* pb) {
pb->set_id(table.id);
pb->set_name(table.name);
pb->set_health(KsckTabletHealthToPB(table.TableStatus()));
@@ -886,7 +852,7 @@ void KsckVersionSummaryToPb(const std::string& version,
void KsckCountSummaryToPb(int master_count,
int tserver_count,
int table_count,
- const std::vector<KsckTabletSummary>& tablet_summaries,
+ const std::vector<TabletSummary>& tablet_summaries,
KsckCountSummaryPB* pb) {
pb->set_masters(master_count);
pb->set_tservers(tserver_count);
@@ -895,7 +861,7 @@ void KsckCountSummaryToPb(int master_count,
int replica_count = std::accumulate(tablet_summaries.begin(),
tablet_summaries.end(),
0,
- [](int acc, const KsckTabletSummary& ts) {
+ [](int acc, const TabletSummary& ts) {
return acc + ts.replicas.size();
});
pb->set_replicas(replica_count);
@@ -907,27 +873,27 @@ void KsckResults::ToPb(KsckResultsPB* pb, int sections) const {
}
if (sections & PrintSections::MASTER_SUMMARIES) {
- for (const auto &master_summary : master_summaries) {
- KsckServerHealthSummaryToPb(master_summary, pb->add_master_summaries());
+ for (const auto &master_summary : cluster_status.master_summaries) {
+ ServerHealthSummaryToPb(master_summary, pb->add_master_summaries());
}
- for (const auto& master_uuid : master_uuids) {
+ for (const auto& master_uuid : cluster_status.master_uuids) {
pb->add_master_uuids(master_uuid);
}
- pb->set_master_consensus_conflict(master_consensus_conflict);
- for (const auto& entry : master_consensus_state_map) {
- KsckConsensusStateToPb(entry.second, pb->add_master_consensus_states());
+ pb->set_master_consensus_conflict(cluster_status.master_consensus_conflict);
+ for (const auto& entry : cluster_status.master_consensus_state_map) {
+ ConsensusStateToPb(entry.second, pb->add_master_consensus_states());
}
}
if (sections & PrintSections::TSERVER_SUMMARIES) {
- for (const auto &tserver_summary : tserver_summaries) {
- KsckServerHealthSummaryToPb(tserver_summary, pb->add_tserver_summaries());
+ for (const auto &tserver_summary : cluster_status.tserver_summaries) {
+ ServerHealthSummaryToPb(tserver_summary, pb->add_tserver_summaries());
}
}
if (sections & PrintSections::TABLET_SUMMARIES) {
- for (const auto &tablet : tablet_summaries) {
- KsckTabletSummaryToPb(tablet, pb->add_tablet_summaries());
+ for (const auto &tablet : cluster_status.tablet_summaries) {
+ TabletSummaryToPb(tablet, pb->add_tablet_summaries());
}
}
@@ -940,8 +906,8 @@ void KsckResults::ToPb(KsckResultsPB* pb, int sections) const {
}
if (sections & PrintSections::TABLE_SUMMARIES) {
- for (const auto &table : table_summaries) {
- KsckTableSummaryToPb(table, pb->add_table_summaries());
+ for (const auto &table : cluster_status.table_summaries) {
+ TableSummaryToPb(table, pb->add_table_summaries());
}
}
@@ -952,10 +918,10 @@ void KsckResults::ToPb(KsckResultsPB* pb, int sections) const {
}
if (sections & PrintSections::TOTAL_COUNT) {
- KsckCountSummaryToPb(master_summaries.size(),
- tserver_summaries.size(),
- table_summaries.size(),
- tablet_summaries,
+ KsckCountSummaryToPb(cluster_status.master_summaries.size(),
+ cluster_status.tserver_summaries.size(),
+ cluster_status.table_summaries.size(),
+ cluster_status.tablet_summaries,
pb->add_count_summaries());
}
}
diff --git a/src/kudu/tools/ksck_results.h b/src/kudu/tools/ksck_results.h
index f9d1211..34675db 100644
--- a/src/kudu/tools/ksck_results.h
+++ b/src/kudu/tools/ksck_results.h
@@ -19,16 +19,14 @@
#include <cstdint>
#include <iosfwd>
#include <map>
-#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
-#include <glog/logging.h>
-#include "kudu/tablet/metadata.pb.h"
+#include "kudu/rebalance/cluster_status.h"
#include "kudu/tablet/tablet.pb.h" // IWYU pragma: keep
#include "kudu/util/status.h"
@@ -37,188 +35,6 @@ namespace tools {
class KsckResultsPB;
-// The result of health check on a tablet.
-// Also used to indicate the health of a table, since the health of a table is
-// the health of its least healthy tablet.
-enum class KsckCheckResult {
- // The tablet is healthy.
- HEALTHY,
-
- // The tablet has on-going tablet copies.
- RECOVERING,
-
- // The tablet has fewer replicas than its table's replication factor and
- // has no on-going tablet copies.
- UNDER_REPLICATED,
-
- // The tablet is missing a majority of its replicas and is unavailable for
- // writes. If a majority cannot be brought back online, then the tablet
- // requires manual intervention to recover.
- UNAVAILABLE,
-
- // There was a discrepancy among the tablets' consensus configs and the master's.
- CONSENSUS_MISMATCH,
-};
-
-const char* const KsckCheckResultToString(KsckCheckResult cr);
-
-// Possible types of consensus configs.
-enum class KsckConsensusConfigType {
- // A config reported by the master.
- MASTER,
- // A config that has been committed.
- COMMITTED,
- // A config that has not yet been committed.
- PENDING,
-};
-
-// Representation of a consensus state.
-struct KsckConsensusState {
- KsckConsensusState() = default;
- KsckConsensusState(KsckConsensusConfigType type,
- boost::optional<int64_t> term,
- boost::optional<int64_t> opid_index,
- boost::optional<std::string> leader_uuid,
- const std::vector<std::string>& voters,
- const std::vector<std::string>& non_voters)
- : type(type),
- term(std::move(term)),
- opid_index(std::move(opid_index)),
- leader_uuid(std::move(leader_uuid)),
- voter_uuids(voters.cbegin(), voters.cend()),
- non_voter_uuids(non_voters.cbegin(), non_voters.cend()) {
- // A consensus state must have a term unless it's one sourced from the master.
- CHECK(type == KsckConsensusConfigType::MASTER || term);
- }
-
- // Two KsckConsensusState structs match if they have the same
- // leader_uuid, the same set of peers, and one of the following holds:
- // - at least one of them is of type MASTER
- // - they are configs of the same type and they have the same term
- bool Matches(const KsckConsensusState &other) const {
- bool same_leader_and_peers =
- leader_uuid == other.leader_uuid &&
- voter_uuids == other.voter_uuids &&
- non_voter_uuids == other.non_voter_uuids;
- if (type == KsckConsensusConfigType::MASTER ||
- other.type == KsckConsensusConfigType::MASTER) {
- return same_leader_and_peers;
- }
- return type == other.type && term == other.term && same_leader_and_peers;
- }
-
- KsckConsensusConfigType type;
- boost::optional<int64_t> term;
- boost::optional<int64_t> opid_index;
- boost::optional<std::string> leader_uuid;
- std::set<std::string> voter_uuids;
- std::set<std::string> non_voter_uuids;
-};
-
-// Represents the health of a server.
-enum class KsckServerHealth {
- // The server is healthy.
- HEALTHY,
-
- // The server rejected attempts to communicate as unauthorized.
- UNAUTHORIZED,
-
- // The server can't be contacted.
- UNAVAILABLE,
-
- // The server reported an unexpected UUID.
- WRONG_SERVER_UUID,
-};
-
-// Return a string representation of 'sh'.
-const char* const ServerHealthToString(KsckServerHealth sh);
-
-// Returns an int signifying the "unhealthiness level" of 'sh'.
-// 0 means healthy; higher values are unhealthier.
-// Useful for sorting or comparing.
-int ServerHealthScore(KsckServerHealth sh);
-
-// A summary of a server health check.
-struct KsckServerHealthSummary {
- std::string uuid;
- std::string address;
- std::string ts_location;
- boost::optional<std::string> version;
- KsckServerHealth health = KsckServerHealth::HEALTHY;
- Status status = Status::OK();
-};
-
-// A summary of the state of a table.
-struct KsckTableSummary {
- std::string id;
- std::string name;
- int replication_factor = 0;
- int healthy_tablets = 0;
- int recovering_tablets = 0;
- int underreplicated_tablets = 0;
- int consensus_mismatch_tablets = 0;
- int unavailable_tablets = 0;
-
- int TotalTablets() const {
- return healthy_tablets + recovering_tablets + underreplicated_tablets +
- consensus_mismatch_tablets + unavailable_tablets;
- }
-
- int UnhealthyTablets() const {
- return TotalTablets() - healthy_tablets;
- }
-
- // Summarize the table's status with a KsckCheckResult.
- // A table's status is determined by the health of the least healthy tablet.
- KsckCheckResult TableStatus() const {
- if (unavailable_tablets > 0) {
- return KsckCheckResult::UNAVAILABLE;
- }
- if (consensus_mismatch_tablets > 0) {
- return KsckCheckResult::CONSENSUS_MISMATCH;
- }
- if (underreplicated_tablets > 0) {
- return KsckCheckResult::UNDER_REPLICATED;
- }
- if (recovering_tablets > 0) {
- return KsckCheckResult::RECOVERING;
- }
- return KsckCheckResult::HEALTHY;
- }
-};
-
-// Types of Kudu servers.
-enum class KsckServerType {
- MASTER,
- TABLET_SERVER,
-};
-
-// Return a string representation of 'type'.
-const char* const ServerTypeToString(KsckServerType type);
-
-// A summary of the state of a tablet replica.
-struct KsckReplicaSummary {
- std::string ts_uuid;
- boost::optional<std::string> ts_address;
- bool ts_healthy = false;
- bool is_leader = false;
- bool is_voter = false;
- tablet::TabletStatePB state = tablet::UNKNOWN;
- boost::optional<tablet::TabletStatusPB> status_pb;
- boost::optional<KsckConsensusState> consensus_state;
-};
-
-// A summary of the state of a tablet.
-struct KsckTabletSummary {
- std::string id;
- std::string table_id;
- std::string table_name;
- KsckCheckResult result;
- std::string status;
- KsckConsensusState master_cstate;
- std::vector<KsckReplicaSummary> replicas;
-};
-
// The result of a checksum on a tablet replica.
struct KsckReplicaChecksum {
std::string ts_address;
@@ -275,8 +91,6 @@ struct PrintSections {
};
};
-typedef std::map<std::string, KsckConsensusState> KsckConsensusStateMap;
-
// A flag and its value.
typedef std::pair<std::string, std::string> KsckFlag;
@@ -291,6 +105,9 @@ typedef std::map<std::string, std::vector<std::string>> KsckVersionToServersMap;
// Container for all the results of a series of ksck checks.
struct KsckResults {
+
+ cluster_summary::ClusterStatus cluster_status;
+
// Collection of error status for failed checks. Used to print out a final
// summary of all failed checks.
// All checks passed if and only if this vector is empty.
@@ -301,10 +118,6 @@ struct KsckResults {
// so they do not cause ksck to report an error.
std::vector<Status> warning_messages;
- // Health summaries for master and tablet servers.
- std::vector<KsckServerHealthSummary> master_summaries;
- std::vector<KsckServerHealthSummary> tserver_summaries;
-
// Version summaries for master and tablet servers.
KsckVersionToServersMap version_summaries;
@@ -314,16 +127,6 @@ struct KsckResults {
KsckFlagToServersMap tserver_flag_to_servers_map;
KsckFlagTagsMap tserver_flag_tags_map;
- // Information about the master consensus configuration.
- std::vector<std::string> master_uuids;
- bool master_consensus_conflict = false;
- KsckConsensusStateMap master_consensus_state_map;
-
- // Detailed information about each table and tablet.
- // Tablet information includes consensus state.
- std::vector<KsckTabletSummary> tablet_summaries;
- std::vector<KsckTableSummary> table_summaries;
-
// Collected results of the checksum scan.
KsckChecksumResults checksum_results;
@@ -341,14 +144,15 @@ struct KsckResults {
// Print a formatted health summary to 'out', given a list `summaries`
// describing the health of servers of type 'type'.
-Status PrintServerHealthSummaries(KsckServerType type,
- const std::vector<KsckServerHealthSummary>& summaries,
- std::ostream& out);
+Status PrintServerHealthSummaries(
+ cluster_summary::ServerType type,
+ const std::vector<cluster_summary::ServerHealthSummary>& summaries,
+ std::ostream& out);
// Print a formatted summary of the flags in 'flag_to_servers_map', indicating
// which servers have which (flag, value) pairs set.
// Flag tag information is sourced from 'flag_tags_map'.
-Status PrintFlagTable(KsckServerType type,
+Status PrintFlagTable(cluster_summary::ServerType type,
int num_servers,
const KsckFlagToServersMap& flag_to_servers_map,
const KsckFlagTagsMap& flag_tags_map,
@@ -362,22 +166,25 @@ Status PrintVersionTable(const KsckVersionToServersMap& version_summaries,
std::ostream& out);
// Print a formatted summary of the tables in 'table_summaries' to 'out'.
-Status PrintTableSummaries(const std::vector<KsckTableSummary>& table_summaries,
- std::ostream& out);
+Status PrintTableSummaries(
+ const std::vector<cluster_summary::TableSummary>& table_summaries,
+ std::ostream& out);
// Print a formatted summary of the tablets in 'tablet_summaries' to 'out'.
-Status PrintTabletSummaries(const std::vector<KsckTabletSummary>& tablet_summaries,
- PrintMode mode,
- std::ostream& out);
+Status PrintTabletSummaries(
+ const std::vector<cluster_summary::TabletSummary>& tablet_summaries,
+ PrintMode mode,
+ std::ostream& out);
// Print to 'out' a "consensus matrix" that compares the consensus states of the
// replicas on servers with ids in 'server_uuids', given the set of consensus
// states in 'consensus_states'. If given, 'ref_cstate' will be used as the
// master's point of view of the consensus state of the tablet.
-Status PrintConsensusMatrix(const std::vector<std::string>& server_uuids,
- const boost::optional<KsckConsensusState> ref_cstate,
- const KsckConsensusStateMap& consensus_states,
- std::ostream& out);
+Status PrintConsensusMatrix(
+ const std::vector<std::string>& server_uuids,
+ const boost::optional<cluster_summary::ConsensusState>& ref_cstate,
+ const cluster_summary::ConsensusStateMap& cstates,
+ std::ostream& out);
// Print to 'out' a table summarizing the counts of tablet replicas in the
// cluster. 'mode' must be PLAIN_FULL or PLAIN_CONCISE. In PLAIN_CONCISE mode,
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer_tool.cc
similarity index 70%
rename from src/kudu/tools/rebalancer.cc
rename to src/kudu/tools/rebalancer_tool.cc
index 6994885..60231e1 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer_tool.cc
@@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/tools/rebalancer.h"
+#include "kudu/tools/rebalancer_tool.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <iterator>
-#include <limits>
#include <map>
#include <memory>
#include <numeric>
@@ -38,33 +37,43 @@
#include <glog/logging.h>
#include "kudu/client/client.h"
-#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/rebalance/placement_policy_util.h"
+#include "kudu/rebalance/rebalance_algo.h"
+#include "kudu/rebalance/rebalancer.h"
#include "kudu/tools/ksck.h"
#include "kudu/tools/ksck_remote.h"
#include "kudu/tools/ksck_results.h"
-#include "kudu/tools/placement_policy_util.h"
-#include "kudu/tools/rebalance_algo.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tools/tool_replica_util.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
-using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::ServerHealthSummary;
+using kudu::cluster_summary::TableSummary;
+using kudu::cluster_summary::TabletSummary;
+using kudu::rebalance::ClusterInfo;
+using kudu::rebalance::ClusterRawInfo;
+using kudu::rebalance::PlacementPolicyViolationInfo;
+using kudu::rebalance::Rebalancer;
+using kudu::rebalance::ServersByCountMap;
+using kudu::rebalance::TableBalanceInfo;
+using kudu::rebalance::TableReplicaMove;
+using kudu::rebalance::TabletsPlacementInfo;
+
using std::accumulate;
using std::endl;
using std::back_inserter;
using std::inserter;
using std::ostream;
using std::map;
-using std::multimap;
-using std::numeric_limits;
using std::pair;
-using std::set_difference;
using std::set;
using std::shared_ptr;
using std::sort;
@@ -79,39 +88,11 @@ using strings::Substitute;
namespace kudu {
namespace tools {
-Rebalancer::Config::Config(
- std::vector<std::string> ignored_tservers_param,
- std::vector<std::string> master_addresses,
- std::vector<std::string> table_filters,
- size_t max_moves_per_server,
- size_t max_staleness_interval_sec,
- int64_t max_run_time_sec,
- bool move_rf1_replicas,
- bool output_replica_distribution_details,
- bool run_policy_fixer,
- bool run_cross_location_rebalancing,
- bool run_intra_location_rebalancing,
- double load_imbalance_threshold)
- : ignored_tservers(ignored_tservers_param.begin(), ignored_tservers_param.end()),
- master_addresses(std::move(master_addresses)),
- table_filters(std::move(table_filters)),
- max_moves_per_server(max_moves_per_server),
- max_staleness_interval_sec(max_staleness_interval_sec),
- max_run_time_sec(max_run_time_sec),
- move_rf1_replicas(move_rf1_replicas),
- output_replica_distribution_details(output_replica_distribution_details),
- run_policy_fixer(run_policy_fixer),
- run_cross_location_rebalancing(run_cross_location_rebalancing),
- run_intra_location_rebalancing(run_intra_location_rebalancing),
- load_imbalance_threshold(load_imbalance_threshold) {
- DCHECK_GE(max_moves_per_server, 0);
-}
-
-Rebalancer::Rebalancer(const Config& config)
- : config_(config) {
+RebalancerTool::RebalancerTool(const Config& config)
+ : Rebalancer(config) {
}
-Status Rebalancer::PrintStats(ostream& out) {
+Status RebalancerTool::PrintStats(ostream& out) {
// First, report on the current balance state of the cluster.
ClusterRawInfo raw_info;
RETURN_NOT_OK(GetClusterRawInfo(boost::none, &raw_info));
@@ -165,7 +146,7 @@ Status Rebalancer::PrintStats(ostream& out) {
return Status::OK();
}
-Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
+Status RebalancerTool::Run(RunStatus* result_status, size_t* moves_count) {
DCHECK(result_status);
*result_status = RunStatus::UNKNOWN;
@@ -260,33 +241,33 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
return Status::OK();
}
-Status Rebalancer::KsckResultsToClusterRawInfo(
+Status RebalancerTool::KsckResultsToClusterRawInfo(
const boost::optional<string>& location,
const KsckResults& ksck_info,
ClusterRawInfo* raw_info) {
DCHECK(raw_info);
// Filter out entities that are not relevant to the specified location.
- vector<KsckServerHealthSummary> tserver_summaries;
- tserver_summaries.reserve(ksck_info.tserver_summaries.size());
+ vector<ServerHealthSummary> tserver_summaries;
+ tserver_summaries.reserve(ksck_info.cluster_status.tserver_summaries.size());
- vector<KsckTabletSummary> tablet_summaries;
- tablet_summaries.reserve(ksck_info.tablet_summaries.size());
+ vector<TabletSummary> tablet_summaries;
+ tablet_summaries.reserve(ksck_info.cluster_status.tablet_summaries.size());
- vector<KsckTableSummary> table_summaries;
+ vector<TableSummary> table_summaries;
table_summaries.reserve(table_summaries.size());
if (!location) {
// Information on the whole cluster.
- tserver_summaries = ksck_info.tserver_summaries;
- tablet_summaries = ksck_info.tablet_summaries;
- table_summaries = ksck_info.table_summaries;
+ tserver_summaries = ksck_info.cluster_status.tserver_summaries;
+ tablet_summaries = ksck_info.cluster_status.tablet_summaries;
+ table_summaries = ksck_info.cluster_status.table_summaries;
} else {
// Information on the specified location only: filter out non-relevant info.
const auto& location_str = *location;
unordered_set<string> ts_ids_at_location;
- for (const auto& summary : ksck_info.tserver_summaries) {
+ for (const auto& summary : ksck_info.cluster_status.tserver_summaries) {
if (summary.ts_location == location_str) {
tserver_summaries.push_back(summary);
InsertOrDie(&ts_ids_at_location, summary.uuid);
@@ -294,7 +275,7 @@ Status Rebalancer::KsckResultsToClusterRawInfo(
}
unordered_set<string> table_ids_at_location;
- for (const auto& summary : ksck_info.tablet_summaries) {
+ for (const auto& summary : ksck_info.cluster_status.tablet_summaries) {
const auto& replicas = summary.replicas;
decltype(summary.replicas) replicas_at_location;
replicas_at_location.reserve(replicas.size());
@@ -310,7 +291,7 @@ Status Rebalancer::KsckResultsToClusterRawInfo(
tablet_summaries.back().replicas = std::move(replicas_at_location);
}
- for (const auto& summary : ksck_info.table_summaries) {
+ for (const auto& summary : ksck_info.cluster_status.table_summaries) {
if (ContainsKey(table_ids_at_location, summary.id)) {
table_summaries.push_back(summary);
}
@@ -324,188 +305,8 @@ Status Rebalancer::KsckResultsToClusterRawInfo(
return Status::OK();
}
-// Given high-level description of moves, find tablets with replicas at the
-// corresponding tablet servers to satisfy those high-level descriptions.
-// The idea is to find all tablets of the specified table that would have a
-// replica at the source server, but would not have a replica at the destination
-// server. That is to satisfy the restriction of having no more than one replica
-// of the same tablet per server.
-//
-// An additional constraint: it's better not to move leader replicas, if
-// possible. If a client has a write operation in progress, moving leader
-// replicas of affected tablets would make the client to re-resolve new leaders
-// and retry the operations. Moving leader replicas is used as last resort
-// when no other candidates are left.
-Status Rebalancer::FindReplicas(const TableReplicaMove& move,
- const ClusterRawInfo& raw_info,
- vector<string>* tablet_ids) {
- const auto& table_id = move.table_id;
-
- // Tablet ids of replicas on the source tserver that are non-leaders.
- vector<string> tablet_uuids_src;
- // Tablet ids of replicas on the source tserver that are leaders.
- vector<string> tablet_uuids_src_leaders;
- // UUIDs of tablets of the selected table at the destination tserver.
- vector<string> tablet_uuids_dst;
-
- for (const auto& tablet_summary : raw_info.tablet_summaries) {
- if (tablet_summary.table_id != table_id) {
- continue;
- }
- if (tablet_summary.result != KsckCheckResult::HEALTHY) {
- VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 "
- "as candidates for movement since the tablet's "
- "status is '$2'",
- table_id, tablet_summary.id,
- KsckCheckResultToString(tablet_summary.result));
- continue;
- }
- for (const auto& replica_summary : tablet_summary.replicas) {
- if (replica_summary.ts_uuid != move.from &&
- replica_summary.ts_uuid != move.to) {
- continue;
- }
- if (!replica_summary.ts_healthy) {
- VLOG(1) << Substitute("table $0: not considering replica movement "
- "from $1 to $2 since server $3 is not healthy",
- table_id,
- move.from, move.to, replica_summary.ts_uuid);
- continue;
- }
- if (replica_summary.ts_uuid == move.from) {
- if (replica_summary.is_leader) {
- tablet_uuids_src_leaders.emplace_back(tablet_summary.id);
- } else {
- tablet_uuids_src.emplace_back(tablet_summary.id);
- }
- } else {
- DCHECK_EQ(move.to, replica_summary.ts_uuid);
- tablet_uuids_dst.emplace_back(tablet_summary.id);
- }
- }
- }
- sort(tablet_uuids_src.begin(), tablet_uuids_src.end());
- sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end());
-
- vector<string> tablet_uuids;
- set_difference(
- tablet_uuids_src.begin(), tablet_uuids_src.end(),
- tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
- inserter(tablet_uuids, tablet_uuids.begin()));
-
- if (!tablet_uuids.empty()) {
- // If there are tablets with non-leader replicas at the source server,
- // those are the best candidates for movement.
- tablet_ids->swap(tablet_uuids);
- return Status::OK();
- }
-
- // If no tablets with non-leader replicas were found, resort to tablets with
- // leader replicas at the source server.
- DCHECK(tablet_uuids.empty());
- sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end());
- set_difference(
- tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(),
- tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
- inserter(tablet_uuids, tablet_uuids.begin()));
-
- tablet_ids->swap(tablet_uuids);
-
- return Status::OK();
-}
-
-void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
- vector<ReplicaMove>* replica_moves) {
- unordered_set<string> tablet_uuids;
- vector<ReplicaMove> filtered_replica_moves;
- for (auto& move_op : *replica_moves) {
- const auto& tablet_uuid = move_op.tablet_uuid;
- if (ContainsKey(scheduled_moves, tablet_uuid)) {
- // There is a move operation in progress for the tablet, don't schedule
- // another one.
- continue;
- }
- if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) {
- filtered_replica_moves.emplace_back(std::move(move_op));
- } else {
- // Rationale behind the unique tablet constraint: the implementation of
- // the Run() method is designed to re-order operations suggested by the
- // high-level algorithm to use the op-count-per-tablet-server capacity
- // as much as possible. Right now, the RunStep() method outputs only one
- // move operation per tablet in every batch. The code below is to
- // enforce the contract between Run() and RunStep() methods.
- LOG(DFATAL) << "detected multiple replica move operations for the same "
- "tablet " << tablet_uuid;
- }
- }
- *replica_moves = std::move(filtered_replica_moves);
-}
-
-Status Rebalancer::FilterCrossLocationTabletCandidates(
- const unordered_map<string, string>& location_by_ts_id,
- const TabletsPlacementInfo& placement_info,
- const TableReplicaMove& move,
- vector<string>* tablet_ids) {
- DCHECK(tablet_ids);
-
- if (tablet_ids->empty()) {
- // Nothing to filter.
- return Status::OK();
- }
-
- const auto& dst_location = FindOrDie(location_by_ts_id, move.to);
- const auto& src_location = FindOrDie(location_by_ts_id, move.from);
-
- // Sanity check: the source and the destination tablet servers should be
- // in different locations.
- if (src_location == dst_location) {
- return Status::InvalidArgument(Substitute(
- "moving replicas of table $0: the same location '$1' for both "
- "the source ($2) and the destination ($3) tablet servers",
- move.table_id, src_location, move.from, move.to));
- }
- if (dst_location.empty()) {
- // The destination location is not specified, so no restrictions on the
- // destination location to check for.
- return Status::OK();
- }
-
- vector<string> tablet_ids_filtered;
- for (auto& tablet_id : *tablet_ids) {
- const auto& replica_count_info = FindOrDie(
- placement_info.tablet_location_info, tablet_id);
- const auto* count_ptr = FindOrNull(replica_count_info, dst_location);
- if (count_ptr == nullptr) {
- // Nothing else to clarify: not a single replica in the destnation
- // location for this candidate tablet.
- tablet_ids_filtered.emplace_back(std::move(tablet_id));
- continue;
- }
- const auto location_replica_num = *count_ptr;
- const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, tablet_id);
- const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
- const auto rf = table_info.replication_factor;
- // In case of RF=2*N+1, losing (N + 1) replicas means losing the majority.
- // In case of RF=2*N, losing at least N replicas means losing the majority.
- const auto replica_num_threshold = rf % 2 ? consensus::MajoritySize(rf)
- : rf / 2;
- if (location_replica_num + 1 >= replica_num_threshold) {
- VLOG(1) << Substitute("destination location '$0' for candidate tablet $1 "
- "already contains $2 of $3 replicas",
- dst_location, tablet_id, location_replica_num, rf);
- continue;
- }
- // No majority of replicas in the destination location: it's OK candidate.
- tablet_ids_filtered.emplace_back(std::move(tablet_id));
- }
-
- *tablet_ids = std::move(tablet_ids_filtered);
-
- return Status::OK();
-}
-
-Status Rebalancer::PrintCrossLocationBalanceStats(const ClusterInfo& ci,
- ostream& out) const {
+Status RebalancerTool::PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+ ostream& out) const {
// Print location load information.
map<string, int64_t> replicas_num_by_location;
for (const auto& elem : ci.balance.servers_by_total_replica_count) {
@@ -528,10 +329,10 @@ Status Rebalancer::PrintCrossLocationBalanceStats(const ClusterInfo& ci,
return Status::OK();
}
-Status Rebalancer::PrintLocationBalanceStats(const string& location,
- const ClusterRawInfo& raw_info,
- const ClusterInfo& ci,
- ostream& out) const {
+Status RebalancerTool::PrintLocationBalanceStats(const string& location,
+ const ClusterRawInfo& raw_info,
+ const ClusterInfo& ci,
+ ostream& out) const {
if (!location.empty()) {
out << "--------------------------------------------------" << endl;
out << "Location: " << location << endl;
@@ -609,7 +410,7 @@ Status Rebalancer::PrintLocationBalanceStats(const string& location,
if (config_.output_replica_distribution_details) {
const auto& table_summaries = raw_info.table_summaries;
- unordered_map<string, const KsckTableSummary*> table_info;
+ unordered_map<string, const TableSummary*> table_info;
for (const auto& summary : table_summaries) {
table_info.emplace(summary.id, &summary);
}
@@ -638,8 +439,8 @@ Status Rebalancer::PrintLocationBalanceStats(const string& location,
return Status::OK();
}
-Status Rebalancer::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
- ostream& out) const {
+Status RebalancerTool::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+ ostream& out) const {
TabletsPlacementInfo placement_info;
RETURN_NOT_OK(BuildTabletsPlacementInfo(
raw_info, MovesInProgress(), &placement_info));
@@ -695,173 +496,7 @@ Status Rebalancer::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
return Status::OK();
}
-Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
- const MovesInProgress& moves_in_progress,
- ClusterInfo* info) const {
- DCHECK(info);
-
- // tserver UUID --> total replica count of all table's tablets at the server
- typedef unordered_map<string, int32_t> TableReplicasAtServer;
-
- // The result information to build.
- ClusterInfo result_info;
-
- unordered_map<string, int32_t> tserver_replicas_count;
- unordered_map<string, TableReplicasAtServer> table_replicas_info;
-
- // Build a set of tables with RF=1 (single replica tables).
- unordered_set<string> rf1_tables;
- if (!config_.move_rf1_replicas) {
- for (const auto& s : raw_info.table_summaries) {
- if (s.replication_factor == 1) {
- rf1_tables.emplace(s.id);
- }
- }
- }
-
- auto& ts_uuids_by_location = result_info.locality.servers_by_location;
- auto& location_by_ts_uuid = result_info.locality.location_by_ts_id;
- for (const auto& summary : raw_info.tserver_summaries) {
- const auto& ts_id = summary.uuid;
- const auto& ts_location = summary.ts_location;
- VLOG(1) << Substitute("found tserver $0 at location '$1'", ts_id, ts_location);
- EmplaceOrDie(&location_by_ts_uuid, ts_id, ts_location);
- auto& ts_ids = LookupOrEmplace(&ts_uuids_by_location,
- ts_location, set<string>());
- InsertOrDie(&ts_ids, ts_id);
- }
-
- for (const auto& s : raw_info.tserver_summaries) {
- if (s.health != KsckServerHealth::HEALTHY) {
- LOG(INFO) << Substitute("skipping tablet server $0 ($1) because of its "
- "non-HEALTHY status ($2)",
- s.uuid, s.address,
- ServerHealthToString(s.health));
- continue;
- }
- tserver_replicas_count.emplace(s.uuid, 0);
- }
-
- for (const auto& tablet : raw_info.tablet_summaries) {
- if (!config_.move_rf1_replicas) {
- if (rf1_tables.find(tablet.table_id) != rf1_tables.end()) {
- LOG(INFO) << Substitute("tablet $0 of table '$1' ($2) has single replica, skipping",
- tablet.id, tablet.table_name, tablet.table_id);
- continue;
- }
- }
-
- // Check if it's one of the tablets which are currently being rebalanced.
- // If so, interpret the move as successfully completed, updating the
- // replica counts correspondingly.
- const auto it_pending_moves = moves_in_progress.find(tablet.id);
- if (it_pending_moves != moves_in_progress.end()) {
- const auto& move_info = it_pending_moves->second;
- bool is_target_replica_present = false;
- // Verify that the target replica is present in the config.
- for (const auto& tr : tablet.replicas) {
- if (tr.ts_uuid == move_info.ts_uuid_to) {
- is_target_replica_present = true;
- break;
- }
- }
- // If the target replica is present, it will be processed in the code
- // below. Otherwise, it's necessary to pretend as if the target replica
- // is in the config already: the idea is to count in the absent target
- // replica as if the movement has successfully completed already.
- auto it = tserver_replicas_count.find(move_info.ts_uuid_to);
- if (!is_target_replica_present && !move_info.ts_uuid_to.empty() &&
- it != tserver_replicas_count.end()) {
- it->second++;
- auto table_ins = table_replicas_info.emplace(
- tablet.table_id, TableReplicasAtServer());
- TableReplicasAtServer& replicas_at_server = table_ins.first->second;
-
- auto replicas_ins = replicas_at_server.emplace(move_info.ts_uuid_to, 0);
- replicas_ins.first->second++;
- }
- }
-
- for (const auto& ri : tablet.replicas) {
- // Increment total count of replicas at the tablet server.
- auto it = tserver_replicas_count.find(ri.ts_uuid);
- if (it == tserver_replicas_count.end()) {
- string msg = Substitute("skipping replica at tserver $0", ri.ts_uuid);
- if (ri.ts_address) {
- msg += " (" + *ri.ts_address + ")";
- }
- msg += " since it's not reported among known tservers";
- LOG(INFO) << msg;
- continue;
- }
- bool do_count_replica = true;
- if (it_pending_moves != moves_in_progress.end()) {
- const auto& move_info = it_pending_moves->second;
- if (move_info.ts_uuid_from == ri.ts_uuid) {
- DCHECK(!ri.ts_uuid.empty());
- // The source replica of the scheduled replica movement operation
- // are still in the config. Interpreting the move as successfully
- // completed, so the source replica should not be counted in.
- do_count_replica = false;
- }
- }
- if (do_count_replica) {
- it->second++;
- }
-
- auto table_ins = table_replicas_info.emplace(
- tablet.table_id, TableReplicasAtServer());
- TableReplicasAtServer& replicas_at_server = table_ins.first->second;
-
- auto replicas_ins = replicas_at_server.emplace(ri.ts_uuid, 0);
- if (do_count_replica) {
- replicas_ins.first->second++;
- }
- }
- }
-
- // Check for the consistency of information derived from the ksck report.
- for (const auto& elem : tserver_replicas_count) {
- const auto& ts_uuid = elem.first;
- int32_t count_by_table_info = 0;
- for (auto& e : table_replicas_info) {
- count_by_table_info += e.second[ts_uuid];
- }
- if (elem.second != count_by_table_info) {
- return Status::Corruption("inconsistent cluster state returned by ksck");
- }
- }
-
- // Populate ClusterBalanceInfo::servers_by_total_replica_count
- auto& servers_by_count = result_info.balance.servers_by_total_replica_count;
- for (const auto& elem : tserver_replicas_count) {
- servers_by_count.emplace(elem.second, elem.first);
- }
-
- // Populate ClusterBalanceInfo::table_info_by_skew
- auto& table_info_by_skew = result_info.balance.table_info_by_skew;
- for (const auto& elem : table_replicas_info) {
- const auto& table_id = elem.first;
- int32_t max_count = numeric_limits<int32_t>::min();
- int32_t min_count = numeric_limits<int32_t>::max();
- TableBalanceInfo tbi;
- tbi.table_id = table_id;
- for (const auto& e : elem.second) {
- const auto& ts_uuid = e.first;
- const auto replica_count = e.second;
- tbi.servers_by_replica_count.emplace(replica_count, ts_uuid);
- max_count = std::max(replica_count, max_count);
- min_count = std::min(replica_count, min_count);
- }
- table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
- }
- // TODO(aserbin): add sanity checks on the result.
- *info = std::move(result_info);
-
- return Status::OK();
-}
-
-Status Rebalancer::RunWith(Runner* runner, RunStatus* result_status) {
+Status RebalancerTool::RunWith(Runner* runner, RunStatus* result_status) {
const MonoDelta max_staleness_delta =
MonoDelta::FromSeconds(config_.max_staleness_interval_sec);
MonoTime staleness_start = MonoTime::Now();
@@ -938,13 +573,13 @@ Status Rebalancer::RunWith(Runner* runner, RunStatus* result_status) {
return Status::OK();
}
-Status Rebalancer::GetClusterRawInfo(const boost::optional<string>& location,
- ClusterRawInfo* raw_info) {
+Status RebalancerTool::GetClusterRawInfo(const boost::optional<string>& location,
+ ClusterRawInfo* raw_info) {
RETURN_NOT_OK(RefreshKsckResults());
return KsckResultsToClusterRawInfo(location, ksck_->results(), raw_info);
}
-Status Rebalancer::RefreshKsckResults() {
+Status RebalancerTool::RefreshKsckResults() {
shared_ptr<KsckCluster> cluster;
RETURN_NOT_OK_PREPEND(
RemoteKsckCluster::Build(config_.master_addresses, &cluster),
@@ -955,10 +590,10 @@ Status Rebalancer::RefreshKsckResults() {
return Status::OK();
}
-Rebalancer::BaseRunner::BaseRunner(Rebalancer* rebalancer,
- std::unordered_set<std::string> ignored_tservers,
- size_t max_moves_per_server,
- boost::optional<MonoTime> deadline)
+RebalancerTool::BaseRunner::BaseRunner(RebalancerTool* rebalancer,
+ std::unordered_set<std::string> ignored_tservers,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline)
: rebalancer_(rebalancer),
ignored_tservers_(std::move(ignored_tservers)),
max_moves_per_server_(max_moves_per_server),
@@ -967,7 +602,7 @@ Rebalancer::BaseRunner::BaseRunner(Rebalancer* rebalancer,
CHECK(rebalancer_);
}
-Status Rebalancer::BaseRunner::Init(vector<string> master_addresses) {
+Status RebalancerTool::BaseRunner::Init(vector<string> master_addresses) {
DCHECK_EQ(0, moves_count_);
DCHECK(op_count_per_ts_.empty());
DCHECK(ts_per_op_count_.empty());
@@ -979,8 +614,8 @@ Status Rebalancer::BaseRunner::Init(vector<string> master_addresses) {
.Build(&client_);
}
-Status Rebalancer::BaseRunner::GetNextMoves(bool* has_moves) {
- vector<ReplicaMove> replica_moves;
+Status RebalancerTool::BaseRunner::GetNextMoves(bool* has_moves) {
+ vector<Rebalancer::ReplicaMove> replica_moves;
RETURN_NOT_OK(GetNextMovesImpl(&replica_moves));
if (replica_moves.empty() && scheduled_moves_.empty()) {
*has_moves = false;
@@ -1006,7 +641,7 @@ Status Rebalancer::BaseRunner::GetNextMoves(bool* has_moves) {
return Status::OK();
}
-void Rebalancer::BaseRunner::UpdateOnMoveCompleted(const string& ts_uuid) {
+void RebalancerTool::BaseRunner::UpdateOnMoveCompleted(const string& ts_uuid) {
const auto op_count = op_count_per_ts_[ts_uuid]--;
const auto op_range = ts_per_op_count_.equal_range(op_count);
bool ts_per_op_count_updated = false;
@@ -1021,8 +656,8 @@ void Rebalancer::BaseRunner::UpdateOnMoveCompleted(const string& ts_uuid) {
DCHECK(ts_per_op_count_updated);
}
-Rebalancer::AlgoBasedRunner::AlgoBasedRunner(
- Rebalancer* rebalancer,
+RebalancerTool::AlgoBasedRunner::AlgoBasedRunner(
+ RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline)
@@ -1033,14 +668,14 @@ Rebalancer::AlgoBasedRunner::AlgoBasedRunner(
random_generator_(random_device_()) {
}
-Status Rebalancer::AlgoBasedRunner::Init(vector<string> master_addresses) {
+Status RebalancerTool::AlgoBasedRunner::Init(vector<string> master_addresses) {
DCHECK(src_op_indices_.empty());
DCHECK(dst_op_indices_.empty());
DCHECK(scheduled_moves_.empty());
return BaseRunner::Init(std::move(master_addresses));
}
-void Rebalancer::AlgoBasedRunner::LoadMoves(vector<ReplicaMove> replica_moves) {
+void RebalancerTool::AlgoBasedRunner::LoadMoves(vector<Rebalancer::ReplicaMove> replica_moves) {
// The moves to schedule (used by subsequent calls to ScheduleNextMove()).
replica_moves_.swap(replica_moves);
@@ -1105,8 +740,8 @@ void Rebalancer::AlgoBasedRunner::LoadMoves(vector<ReplicaMove> replica_moves) {
}
}
-bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
- bool* timed_out) {
+bool RebalancerTool::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
+ bool* timed_out) {
DCHECK(has_errors);
DCHECK(timed_out);
*has_errors = false;
@@ -1157,7 +792,7 @@ bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
return false;
}
-bool Rebalancer::AlgoBasedRunner::UpdateMovesInProgressStatus(
+bool RebalancerTool::AlgoBasedRunner::UpdateMovesInProgressStatus(
bool* has_errors, bool* timed_out) {
DCHECK(has_errors);
DCHECK(timed_out);
@@ -1220,8 +855,8 @@ bool Rebalancer::AlgoBasedRunner::UpdateMovesInProgressStatus(
// Run one step of the rebalancer. Due to the inherent restrictions of the
// rebalancing engine, no more than one replica per tablet is moved during
// one step of the rebalancing.
-Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
- vector<ReplicaMove>* replica_moves) {
+Status RebalancerTool::AlgoBasedRunner::GetNextMovesImpl(
+ vector<Rebalancer::ReplicaMove>* replica_moves) {
const auto& loc = location();
ClusterRawInfo raw_info;
RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(loc, &raw_info));
@@ -1231,7 +866,7 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
// Otherwise, the rebalancing might interfere with the
// automatic re-replication or get unexpected errors while moving replicas.
for (const auto& s : raw_info.tserver_summaries) {
- if (s.health != KsckServerHealth::HEALTHY) {
+ if (s.health != ServerHealth::HEALTHY) {
if (ContainsKey(ignored_tservers_, s.uuid)) {
continue;
}
@@ -1290,7 +925,7 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
unordered_set<string> tablets_in_move;
transform(scheduled_moves_.begin(), scheduled_moves_.end(),
inserter(tablets_in_move, tablets_in_move.begin()),
- [](const MovesInProgress::value_type& elem) {
+ [](const Rebalancer::MovesInProgress::value_type& elem) {
return elem.first;
});
for (const auto& move : moves) {
@@ -1323,7 +958,7 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
"from server $1 to server $2", move.table_id, move.from, move.to);
continue;
}
- ReplicaMove move_info;
+ Rebalancer::ReplicaMove move_info;
move_info.tablet_uuid = move_tablet_id;
move_info.ts_uuid_from = move.from;
const auto& extra_info = FindOrDie(extra_info_by_tablet_id, move_tablet_id);
@@ -1345,7 +980,7 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
return Status::OK();
}
-bool Rebalancer::AlgoBasedRunner::FindNextMove(size_t* op_idx) {
+bool RebalancerTool::AlgoBasedRunner::FindNextMove(size_t* op_idx) {
vector<size_t> op_indices;
for (auto it = ts_per_op_count_.begin(); op_indices.empty() &&
it != ts_per_op_count_.end() && it->first < max_moves_per_server_; ++it) {
@@ -1391,7 +1026,7 @@ bool Rebalancer::AlgoBasedRunner::FindNextMove(size_t* op_idx) {
return !op_indices.empty();
}
-void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduled(
+void RebalancerTool::AlgoBasedRunner::UpdateOnMoveScheduled(
size_t idx,
const string& tablet_uuid,
const string& src_ts_uuid,
@@ -1408,7 +1043,7 @@ void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduled(
UpdateOnMoveScheduledImpl(idx, dst_ts_uuid, is_success, &dst_op_indices_);
}
-void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduledImpl(
+void RebalancerTool::AlgoBasedRunner::UpdateOnMoveScheduledImpl(
size_t idx,
const string& ts_uuid,
bool is_success,
@@ -1436,8 +1071,8 @@ void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduledImpl(
}
}
-Rebalancer::IntraLocationRunner::IntraLocationRunner(
- Rebalancer* rebalancer,
+RebalancerTool::IntraLocationRunner::IntraLocationRunner(
+ RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline,
@@ -1448,8 +1083,8 @@ Rebalancer::IntraLocationRunner::IntraLocationRunner(
std::move(deadline)),
location_(std::move(location)) {
}
-
-Rebalancer::CrossLocationRunner::CrossLocationRunner(Rebalancer* rebalancer,
+RebalancerTool::CrossLocationRunner::CrossLocationRunner(
+ RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
double load_imbalance_threshold,
@@ -1461,8 +1096,8 @@ Rebalancer::CrossLocationRunner::CrossLocationRunner(Rebalancer* rebalancer,
algorithm_(load_imbalance_threshold) {
}
-Rebalancer::PolicyFixer::PolicyFixer(
- Rebalancer* rebalancer,
+RebalancerTool::PolicyFixer::PolicyFixer(
+ RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline)
@@ -1472,13 +1107,13 @@ Rebalancer::PolicyFixer::PolicyFixer(
std::move(deadline)) {
}
-Status Rebalancer::PolicyFixer::Init(vector<string> master_addresses) {
+Status RebalancerTool::PolicyFixer::Init(vector<string> master_addresses) {
DCHECK(moves_to_schedule_.empty());
return BaseRunner::Init(std::move(master_addresses));
}
-void Rebalancer::PolicyFixer::LoadMoves(
- vector<ReplicaMove> replica_moves) {
+void RebalancerTool::PolicyFixer::LoadMoves(
+ vector<Rebalancer::ReplicaMove> replica_moves) {
// Replace the list of moves operations to schedule. Even if it's not empty,
// some elements of it might be irrelevant anyway, so there is no need to
// keep any since the new information is the most up-to-date. The input list
@@ -1503,8 +1138,8 @@ void Rebalancer::PolicyFixer::LoadMoves(
}
}
-bool Rebalancer::PolicyFixer::ScheduleNextMove(bool* has_errors,
- bool* timed_out) {
+bool RebalancerTool::PolicyFixer::ScheduleNextMove(bool* has_errors,
+ bool* timed_out) {
DCHECK(has_errors);
DCHECK(timed_out);
*has_errors = false;
@@ -1515,7 +1150,7 @@ bool Rebalancer::PolicyFixer::ScheduleNextMove(bool* has_errors,
return false;
}
- ReplicaMove move_info;
+ Rebalancer::ReplicaMove move_info;
if (!FindNextMove(&move_info)) {
return false;
}
@@ -1551,7 +1186,7 @@ bool Rebalancer::PolicyFixer::ScheduleNextMove(bool* has_errors,
return true;
}
-bool Rebalancer::PolicyFixer::UpdateMovesInProgressStatus(
+bool RebalancerTool::PolicyFixer::UpdateMovesInProgressStatus(
bool* has_errors, bool* timed_out) {
DCHECK(has_errors);
DCHECK(timed_out);
@@ -1600,8 +1235,8 @@ bool Rebalancer::PolicyFixer::UpdateMovesInProgressStatus(
return has_updates;
}
-Status Rebalancer::PolicyFixer::GetNextMovesImpl(
- vector<ReplicaMove>* replica_moves) {
+Status RebalancerTool::PolicyFixer::GetNextMovesImpl(
+ vector<Rebalancer::ReplicaMove>* replica_moves) {
ClusterRawInfo raw_info;
RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(boost::none, &raw_info));
@@ -1611,7 +1246,7 @@ Status Rebalancer::PolicyFixer::GetNextMovesImpl(
// automatic re-replication or get unexpected errors while moving replicas.
// TODO(aserbin): move it somewhere else?
for (const auto& s : raw_info.tserver_summaries) {
- if (s.health != KsckServerHealth::HEALTHY) {
+ if (s.health != ServerHealth::HEALTHY) {
if (ContainsKey(ignored_tservers_, s.uuid)) {
continue;
}
@@ -1660,7 +1295,7 @@ Status Rebalancer::PolicyFixer::GetNextMovesImpl(
return Status::OK();
}
-bool Rebalancer::PolicyFixer::FindNextMove(ReplicaMove* move) {
+bool RebalancerTool::PolicyFixer::FindNextMove(Rebalancer::ReplicaMove* move) {
DCHECK(move);
// TODO(aserbin): use pessimistic /2 limit for max_moves_per_servers_
// since the desitnation servers for the move of the replica marked with
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer_tool.h
similarity index 52%
rename from src/kudu/tools/rebalancer.h
rename to src/kudu/tools/rebalancer_tool.h
index 617f33a..2b1f33f 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer_tool.h
@@ -31,18 +31,12 @@
#include <boost/optional/optional.hpp>
#include "kudu/client/shared_ptr.h"
-#include "kudu/tools/ksck_results.h"
-#include "kudu/tools/rebalance_algo.h"
+#include "kudu/rebalance/rebalance_algo.h"
+#include "kudu/rebalance/rebalancer.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
-namespace tools {
-struct TabletsPlacementInfo;
-} // namespace tools
-} // namespace kudu
-
-namespace kudu {
namespace client {
class KuduClient;
@@ -51,113 +45,17 @@ class KuduClient;
namespace tools {
class Ksck;
-
-// Sub-set of fields from KsckResult which are relevant to the rebalancing.
-struct ClusterRawInfo {
- std::vector<KsckServerHealthSummary> tserver_summaries;
- std::vector<KsckTableSummary> table_summaries;
- std::vector<KsckTabletSummary> tablet_summaries;
-};
+struct KsckResults;
// A class implementing logic for Kudu cluster rebalancing.
-class Rebalancer {
+// This class inherits from rebalance::Rebalancer but also
+// implements additional functions to print cluster balance
+// information.
+class RebalancerTool : public rebalance::Rebalancer {
public:
- // Configuration parameters for the rebalancer aggregated into a struct.
- struct Config {
- static constexpr double kLoadImbalanceThreshold = 1.0;
-
- // NOLINTNEXTLINE(google-explicit-constructor)
- Config(std::vector<std::string> ignored_tservers_param = {},
- std::vector<std::string> master_addresses = {},
- std::vector<std::string> table_filters = {},
- size_t max_moves_per_server = 5,
- size_t max_staleness_interval_sec = 300,
- int64_t max_run_time_sec = 0,
- bool move_rf1_replicas = false,
- bool output_replica_distribution_details = false,
- bool run_policy_fixer = true,
- bool run_cross_location_rebalancing = true,
- bool run_intra_location_rebalancing = true,
- double load_imbalance_threshold = kLoadImbalanceThreshold);
-
- // UUIDs of ignored servers. If empty, allow to run the
- // rebalancing only when all tablet servers in cluster are healthy.
- // If not empty, allow to run the rebalancing when servers in
- // ignored_tservers are unhealthy.
- std::unordered_set<std::string> ignored_tservers;
-
- // Kudu masters' RPC endpoints.
- std::vector<std::string> master_addresses;
-
- // Names of tables to balance. If empty, every table and the whole cluster
- // will be balanced.
- std::vector<std::string> table_filters;
-
- // Maximum number of move operations to run concurrently on one server.
- // An 'operation on a server' means a move operation where either source or
- // destination replica is located on the specified server.
- size_t max_moves_per_server;
-
- // Maximum duration of the 'staleness' interval, when the rebalancer cannot
- // make any progress in scheduling new moves and no prior scheduled moves
- // are left, even if re-synchronizing against the cluster's state again and
- // again. Such a staleness usually happens in case of a persistent problem
- // with the cluster or when some unexpected concurrent activity is present
- // (such as automatic recovery of failed replicas, etc.).
- size_t max_staleness_interval_sec;
-
- // Maximum run time, in seconds.
- int64_t max_run_time_sec;
-
- // Whether to move replicas of tablets with replication factor of one.
- bool move_rf1_replicas;
-
- // Whether Rebalancer::PrintStats() should output per-table and per-server
- // replica distribution details.
- bool output_replica_distribution_details;
-
- // In case of multi-location cluster, whether to detect and fix placement
- // policy violations. Fixing placement policy violations involves moving
- // tablet replicas across different locations in the cluster.
- // This setting is applicable to multi-location clusters only.
- bool run_policy_fixer;
-
- // In case of multi-location cluster, whether to move tablet replicas
- // between locations in attempt to spread tablet replicas among location
- // evenly (equalizing loads of locations throughout the cluster).
- // This setting is applicable to multi-location clusters only.
- bool run_cross_location_rebalancing;
-
- // In case of multi-location cluster, whether to rebalance tablet replica
- // distribution within each location.
- // This setting is applicable to multi-location clusters only.
- bool run_intra_location_rebalancing;
-
- // The per-table location load imbalance threshold for the cross-location
- // balancing algorithm.
- double load_imbalance_threshold;
- };
-
- // Represents a concrete move of a replica from one tablet server to another.
- // Formed logically from a TableReplicaMove by specifying a tablet for the move.
- struct ReplicaMove {
- std::string tablet_uuid;
- std::string ts_uuid_from;
- std::string ts_uuid_to;
- boost::optional<int64_t> config_opid_idx; // for CAS-enabled Raft changes
- };
-
- enum class RunStatus {
- UNKNOWN,
- CLUSTER_IS_BALANCED,
- TIMED_OUT,
- };
-
- // A helper type: key is tablet UUID which corresponds to value.tablet_uuid.
- typedef std::unordered_map<std::string, ReplicaMove> MovesInProgress;
// Create Rebalancer object with the specified configuration.
- explicit Rebalancer(const Config& config);
+ explicit RebalancerTool(const Config& config);
// Print the stats on the cluster balance information into the 'out' stream.
Status PrintStats(std::ostream& out);
@@ -169,45 +67,10 @@ class Rebalancer {
Status Run(RunStatus* result_status, size_t* moves_count = nullptr);
private:
- // Helper class to find and schedule next available rebalancing move operation
- // and track already scheduled ones.
- class Runner {
- public:
- virtual ~Runner() = default;
-
- // Initialize instance of Runner so it can run against Kudu cluster with
- // the 'master_addresses' RPC endpoints.
- virtual Status Init(std::vector<std::string> master_addresses) = 0;
-
- // Load information on the prescribed replica movement operations. Also,
- // populate helper containers and other auxiliary run-time structures
- // used by ScheduleNextMove(). This method is called with every batch
- // of move operations output by the rebalancing algorithm once previously
- // loaded moves have been scheduled.
- virtual void LoadMoves(std::vector<ReplicaMove> replica_moves) = 0;
-
- // Schedule next replica move. Returns 'true' if replica move operation
- // has been scheduled successfully; otherwise returns 'false' and sets
- // the 'has_errors' and 'timed_out' parameters accordingly.
- virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0;
-
- // Update statuses and auxiliary information on in-progress replica move
- // operations. The 'timed_out' parameter is set to 'true' if not all
- // in-progress operations were processed by the deadline specified by
- // the 'deadline_' member field. The method returns 'true' if it's necessary
- // to clear the state of the in-progress operations, i.e. 'forget'
- // those, starting from a clean state.
- virtual bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) = 0;
-
- virtual Status GetNextMoves(bool* has_moves) = 0;
-
- virtual uint32_t moves_count() const = 0;
- }; // class Runner
-
// Common base for a few Runner implementations.
class BaseRunner : public Runner {
public:
- BaseRunner(Rebalancer* rebalancer,
+ BaseRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline);
@@ -228,14 +91,14 @@ class Rebalancer {
// rebalancing algorithm is translated into particular replica movement
// instructions, which are used to populate the 'replica_moves' parameter
// (the container is cleared first).
- virtual Status GetNextMovesImpl(std::vector<ReplicaMove>* moves) = 0;
+ virtual Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* moves) = 0;
// Update the helper containers once a scheduled operation is complete
// (i.e. succeeded or failed).
void UpdateOnMoveCompleted(const std::string& ts_uuid);
// A pointer to the Rebalancer object.
- Rebalancer* rebalancer_;
+ RebalancerTool* rebalancer_;
// A set of ignored tablet server UUIDs.
const std::unordered_set<std::string> ignored_tservers_;
@@ -246,7 +109,7 @@ class Rebalancer {
const size_t max_moves_per_server_;
// Deadline for the activity performed by the Runner class in
- // ScheduleNextMoves() and UpadteMovesInProgressStatus() methods.
+ // ScheduleNextMoves() and UpdateMovesInProgressStatus() methods.
const boost::optional<MonoTime> deadline_;
// Client object to make queries to Kudu masters for various auxiliary info
@@ -255,7 +118,7 @@ class Rebalancer {
// Information on scheduled replica movement operations; keys are
// tablet UUIDs, values are ReplicaMove structures.
- MovesInProgress scheduled_moves_;
+ Rebalancer::MovesInProgress scheduled_moves_;
// Number of successfully completed replica moves operations.
uint32_t moves_count_;
@@ -281,14 +144,14 @@ class Rebalancer {
// The 'deadline' specifies the deadline for the run, 'boost::none'
// if no timeout is set. If 'location' is boost::none, rebalance across
// locations.
- AlgoBasedRunner(Rebalancer* rebalancer,
+ AlgoBasedRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline);
Status Init(std::vector<std::string> master_addresses) override;
- void LoadMoves(std::vector<ReplicaMove> replica_moves) override;
+ void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) override;
bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;
@@ -299,10 +162,10 @@ class Rebalancer {
virtual const boost::optional<std::string>& location() const = 0;
// Rebalancing algorithm that running uses to find replica moves.
- virtual RebalancingAlgo* algorithm() = 0;
+ virtual rebalance::RebalancingAlgo* algorithm() = 0;
protected:
- Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override;
+ Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
// Using the helper containers src_op_indices_ and dst_op_indices_,
// find the index of the most optimal replica movement operation
@@ -324,7 +187,7 @@ class Rebalancer {
std::unordered_map<std::string, std::set<size_t>>* op_indices);
// The moves to schedule.
- std::vector<ReplicaMove> replica_moves_;
+ std::vector<Rebalancer::ReplicaMove> replica_moves_;
// Mapping 'tserver UUID' --> 'indices of move operations having the
// tserver UUID (i.e. the key) as the source of the move operation'.
@@ -350,13 +213,13 @@ class Rebalancer {
// if no timeout is set. In case of non-location aware cluster or if there
// is just one location defined in the whole cluster, the whole cluster will
// be rebalanced.
- IntraLocationRunner(Rebalancer* rebalancer,
+ IntraLocationRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline,
std::string location);
- RebalancingAlgo* algorithm() override {
+ rebalance::RebalancingAlgo* algorithm() override {
return &algorithm_;
}
@@ -368,7 +231,7 @@ class Rebalancer {
const boost::optional<std::string> location_;
// An instance of the balancing algorithm.
- TwoDimensionalGreedyAlgo algorithm_;
+ rebalance::TwoDimensionalGreedyAlgo algorithm_;
};
class CrossLocationRunner : public AlgoBasedRunner {
@@ -381,13 +244,13 @@ class Rebalancer {
// balancing algorithm used for finding the most optimal replica movements.
// The 'deadline' specifies the deadline for the run, 'boost::none'
// if no timeout is set.
- CrossLocationRunner(Rebalancer* rebalancer,
+ CrossLocationRunner(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
double load_imbalance_threshold,
boost::optional<MonoTime> deadline);
- RebalancingAlgo* algorithm() override {
+ rebalance::RebalancingAlgo* algorithm() override {
return &algorithm_;
}
@@ -399,19 +262,19 @@ class Rebalancer {
const boost::optional<std::string> location_ = boost::none;
// An instance of the balancing algorithm.
- LocationBalancingAlgo algorithm_;
+ rebalance::LocationBalancingAlgo algorithm_;
};
class PolicyFixer : public BaseRunner {
public:
- PolicyFixer(Rebalancer* rebalancer,
+ PolicyFixer(RebalancerTool* rebalancer,
std::unordered_set<std::string> ignored_tservers,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline);
Status Init(std::vector<std::string> master_addresses) override;
- void LoadMoves(std::vector<ReplicaMove> replica_moves) override;
+ void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) override;
bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;
@@ -419,18 +282,16 @@ class Rebalancer {
private:
// Key is tserver UUID which corresponds to value.ts_uuid_from.
- typedef std::unordered_multimap<std::string, ReplicaMove> MovesToSchedule;
+ typedef std::unordered_multimap<std::string, Rebalancer::ReplicaMove> MovesToSchedule;
- Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override;
+ Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
- bool FindNextMove(ReplicaMove* move);
+ bool FindNextMove(Rebalancer::ReplicaMove* move);
// Moves yet to schedule.
MovesToSchedule moves_to_schedule_;
};
- friend class KsckResultsToClusterBalanceInfoTest;
-
// Convert ksck results into information relevant to rebalancing the cluster
// at the location specified by 'location' parameter ('boost::none' for
// 'location' means that's about cross-location rebalancing). Basically,
@@ -439,78 +300,34 @@ class Rebalancer {
static Status KsckResultsToClusterRawInfo(
const boost::optional<std::string>& location,
const KsckResults& ksck_info,
- ClusterRawInfo* raw_info);
-
- // Given high-level move-some-tablet-replica-for-a-table information from the
- // rebalancing algorithm, find appropriate tablet replicas to move between the
- // specified tablet servers. The set of result tablet UUIDs is output
- // into the 'tablet_ids' container (note: the container is first cleared).
- // The source and destination replicas are determined by the elements of the
- // 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from and
- // TableReplica::to correspondingly. If no suitable tablet replicas are found,
- // 'tablet_ids' will be empty with the result status of Status::OK().
- static Status FindReplicas(const TableReplicaMove& move,
- const ClusterRawInfo& raw_info,
- std::vector<std::string>* tablet_ids);
-
- // Filter move operations in 'replica_moves': remove all operations that would
- // involve moving replicas of tablets which are in 'scheduled_moves'. The
- // 'replica_moves' cannot be null.
- static void FilterMoves(const MovesInProgress& scheduled_moves,
- std::vector<ReplicaMove>* replica_moves);
-
- // Filter the list of candidate tablets to make sure the location
- // of the destination server would not contain the majority of replicas
- // after the move. The 'tablet_ids' is an in-out parameter.
- static Status FilterCrossLocationTabletCandidates(
- const std::unordered_map<std::string, std::string>& location_by_ts_id,
- const TabletsPlacementInfo& placement_info,
- const TableReplicaMove& move,
- std::vector<std::string>* tablet_ids);
+ rebalance::ClusterRawInfo* raw_info);
// Print information on the cross-location balance.
- Status PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+ Status PrintCrossLocationBalanceStats(const rebalance::ClusterInfo& ci,
std::ostream& out) const;
// Print statistics for the specified location. If 'location' is an empty
// string, that's about printing the cluster-wide stats for a cluster that
// doesn't have any locations defined.
Status PrintLocationBalanceStats(const std::string& location,
- const ClusterRawInfo& raw_info,
- const ClusterInfo& ci,
+ const rebalance::ClusterRawInfo& raw_info,
+ const rebalance::ClusterInfo& ci,
std::ostream& out) const;
- Status PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+ Status PrintPolicyViolationInfo(const rebalance::ClusterRawInfo& raw_info,
std::ostream& out) const;
- // Convert the 'raw' information about the cluster into information suitable
- // for the input of the high-level rebalancing algorithm.
- // The 'moves_in_progress' parameter contains information on the replica moves
- // which have been scheduled by a caller and still in progress: those are
- // considered as successfully completed and applied to the 'raw_info' when
- // building ClusterBalanceInfo for the specified 'raw_info' input. The idea
- // is to prevent the algorithm outputting the same moves again while some
- // of the moves recommended at prior steps are still in progress.
- // The result cluster balance information is output into the 'info' parameter.
- // The 'info' output parameter cannot be null.
- Status BuildClusterInfo(const ClusterRawInfo& raw_info,
- const MovesInProgress& moves_in_progress,
- ClusterInfo* info) const;
-
// Run rebalancing using the specified runner.
Status RunWith(Runner* runner, RunStatus* result_status);
// Refresh the information on the cluster for the specified location
// (involves running ksck).
Status GetClusterRawInfo(const boost::optional<std::string>& location,
- ClusterRawInfo* raw_info);
+ rebalance::ClusterRawInfo* raw_info);
// Reset ksck-related fields and run ksck against the cluster.
Status RefreshKsckResults();
- // Configuration for the rebalancer.
- const Config config_;
-
// Auxiliary Ksck object to get information on the cluster.
std::shared_ptr<Ksck> ksck_;
};
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
index 2a9ff89..570ef45 100644
--- a/src/kudu/tools/tool.proto
+++ b/src/kudu/tools/tool.proto
@@ -202,22 +202,22 @@ message ControlShellRequestPB {
message KsckResultsPB {
repeated string errors = 1;
- repeated KsckServerHealthSummaryPB master_summaries = 2;
- repeated KsckServerHealthSummaryPB tserver_summaries = 3;
+ repeated ServerHealthSummaryPB master_summaries = 2;
+ repeated ServerHealthSummaryPB tserver_summaries = 3;
repeated string master_uuids = 4;
optional bool master_consensus_conflict = 5;
- repeated KsckConsensusStatePB master_consensus_states = 6;
+ repeated ConsensusStatePB master_consensus_states = 6;
- repeated KsckTabletSummaryPB tablet_summaries = 7;
- repeated KsckTableSummaryPB table_summaries = 8;
+ repeated TabletSummaryPB tablet_summaries = 7;
+ repeated TableSummaryPB table_summaries = 8;
optional KsckChecksumResultsPB checksum_results = 9;
repeated KsckVersionSummaryPB version_summaries = 10;
repeated KsckCountSummaryPB count_summaries = 11;
}
-message KsckServerHealthSummaryPB {
+message ServerHealthSummaryPB {
enum ServerHealth {
UNKNOWN = 999;
HEALTHY = 0;
@@ -233,7 +233,7 @@ message KsckServerHealthSummaryPB {
optional string location = 6;
}
-message KsckConsensusStatePB {
+message ConsensusStatePB {
enum ConfigType {
UNKNOWN = 999;
MASTER = 0;
@@ -257,17 +257,17 @@ enum KsckTabletHealthPB {
CONSENSUS_MISMATCH = 4;
}
-message KsckTabletSummaryPB {
+message TabletSummaryPB {
optional string id = 1;
optional string table_id = 2;
optional string table_name = 3;
optional KsckTabletHealthPB health = 4;
optional string status = 5;
- optional KsckConsensusStatePB master_cstate = 6;
- repeated KsckReplicaSummaryPB replicas = 7;
+ optional ConsensusStatePB master_cstate = 6;
+ repeated ReplicaSummaryPB replicas = 7;
}
-message KsckReplicaSummaryPB {
+message ReplicaSummaryPB {
optional string ts_uuid = 1;
optional string ts_address = 2;
optional bool ts_healthy = 3;
@@ -275,10 +275,10 @@ message KsckReplicaSummaryPB {
optional bool is_voter = 5;
optional tablet.TabletStatePB state = 6;
optional tablet.TabletStatusPB status_pb = 7;
- optional KsckConsensusStatePB consensus_state = 8;
+ optional ConsensusStatePB consensus_state = 8;
}
-message KsckTableSummaryPB {
+message TableSummaryPB {
optional string id = 1;
optional string name = 2;
optional KsckTabletHealthPB health = 3;
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index d30cda6..5875dfb 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -33,16 +33,19 @@
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/rebalance/rebalancer.h"
#include "kudu/tools/ksck.h"
#include "kudu/tools/ksck_remote.h"
#include "kudu/tools/ksck_results.h"
-#include "kudu/tools/rebalancer.h"
+#include "kudu/tools/rebalancer_tool.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tools/tool_replica_util.h"
#include "kudu/util/status.h"
#include "kudu/util/version_util.h"
+using kudu::rebalance::Rebalancer;
using std::cout;
using std::endl;
using std::make_tuple;
@@ -123,7 +126,7 @@ DEFINE_bool(disable_intra_location_rebalancing, false,
"This setting is applicable to multi-location clusters only.");
DEFINE_double(load_imbalance_threshold,
- kudu::tools::Rebalancer::Config::kLoadImbalanceThreshold,
+ kudu::rebalance::Rebalancer::Config::kLoadImbalanceThreshold,
"The threshold for the per-table location load imbalance. "
"The threshold is used during the cross-location rebalancing "
"phase. If the measured cross-location load imbalance for a "
@@ -225,8 +228,8 @@ Status EvaluateMoveSingleReplicasFlag(const vector<string>& master_addresses,
ignore_result(ksck->Run());
const auto& ksck_results = ksck->results();
- for (const auto& summaries : { ksck_results.tserver_summaries,
- ksck_results.master_summaries }) {
+ for (const auto& summaries : { ksck_results.cluster_status.tserver_summaries,
+ ksck_results.cluster_status.master_summaries }) {
for (const auto& summary : summaries) {
if (summary.version) {
if (!VersionSupportsRF1Movement(*summary.version)) {
@@ -255,8 +258,8 @@ Status EvaluateMoveSingleReplicasFlag(const vector<string>& master_addresses,
// available. The idea is to reduce the risk of unintended unavailability
// unless it's explicitly requested by the operator.
boost::optional<string> tid;
- if (!ksck_results.tablet_summaries.empty()) {
- tid = ksck_results.tablet_summaries.front().id;
+ if (!ksck_results.cluster_status.tablet_summaries.empty()) {
+ tid = ksck_results.cluster_status.tablet_summaries.front().id;
}
bool is_343_scheme = false;
auto s = Is343SchemeCluster(master_addresses, tid, &is_343_scheme);
@@ -293,7 +296,7 @@ Status RunRebalance(const RunnerContext& context) {
bool move_single_replicas = false;
RETURN_NOT_OK(EvaluateMoveSingleReplicasFlag(master_addresses,
&move_single_replicas));
- Rebalancer rebalancer(Rebalancer::Config(
+ RebalancerTool rebalancer(Rebalancer::Config(
ignored_tservers,
master_addresses,
table_filters,
@@ -314,17 +317,17 @@ Status RunRebalance(const RunnerContext& context) {
return Status::OK();
}
- Rebalancer::RunStatus result_status;
+ RebalancerTool::RunStatus result_status;
size_t moves_count;
RETURN_NOT_OK(rebalancer.Run(&result_status, &moves_count));
const string msg_template = "rebalancing is complete: $0 (moved $1 replicas)";
string msg_result_status;
switch (result_status) {
- case Rebalancer::RunStatus::CLUSTER_IS_BALANCED:
+ case RebalancerTool::RunStatus::CLUSTER_IS_BALANCED:
msg_result_status = "cluster is balanced";
break;
- case Rebalancer::RunStatus::TIMED_OUT:
+ case RebalancerTool::RunStatus::TIMED_OUT:
msg_result_status = "time is up";
break;
default: