You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/11/06 22:38:30 UTC
[2/3] kudu git commit: [rebalancer] location-aware rebalancer (part
9/n)
[rebalancer] location-aware rebalancer (part 9/n)
Updated reporting functionality of the rebalancer tool to output
information on placement policy violations and other relevant
information for location-aware clusters.
Added one simple integration test as well.
Change-Id: I8407e9f8cf6b41a6aeb075372d852125d9739e08
Reviewed-on: http://gerrit.cloudera.org:8080/11862
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8e9345a7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8e9345a7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8e9345a7
Branch: refs/heads/master
Commit: 8e9345a79849ed3e96a85dc5240e0a4e709b2055
Parents: e172df4
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Oct 26 18:25:24 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Nov 6 21:59:21 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/placement_policy_util-test.cc | 38 +--
src/kudu/tools/placement_policy_util.cc | 2 +-
src/kudu/tools/placement_policy_util.h | 1 -
src/kudu/tools/rebalancer.cc | 332 +++++++++++++++-------
src/kudu/tools/rebalancer.h | 15 +
src/kudu/tools/rebalancer_tool-test.cc | 208 ++++++++++++--
6 files changed, 456 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util-test.cc b/src/kudu/tools/placement_policy_util-test.cc
index 7d48cbe..cccd611 100644
--- a/src/kudu/tools/placement_policy_util-test.cc
+++ b/src/kudu/tools/placement_policy_util-test.cc
@@ -139,19 +139,19 @@ void ClusterConfigToClusterPlacementInfo(const TestClusterConfig& tcc,
*tpi = std::move(result_tpi);
}
-// TODO(aserbin): is it needed at all?
bool operator==(const PlacementPolicyViolationInfo& lhs,
const PlacementPolicyViolationInfo& rhs) {
return lhs.tablet_id == rhs.tablet_id &&
lhs.majority_location == rhs.majority_location &&
lhs.replicas_num_at_majority_location ==
- rhs.replicas_num_at_majority_location &&
- lhs.replication_factor == rhs.replication_factor;
+ rhs.replicas_num_at_majority_location;
}
ostream& operator<<(ostream& s, const PlacementPolicyViolationInfo& info) {
s << "{tablet_id: " << info.tablet_id
- << ", location: " << info.majority_location << "}";
+ << ", location: " << info.majority_location
+ << ", replicas_num_at_majority_location: "
+ << info.replicas_num_at_majority_location << "}";
return s;
}
@@ -327,7 +327,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
{ "D", {} },
{ "E", {} },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 3 }, },
{ { "t0", "C" }, }
},
@@ -345,7 +345,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
{ "B", { "t0", } },
{ "C", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 2 }, },
{},
},
@@ -364,7 +364,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
{ "C", { "t0", } },
{ "D", {} },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 2 }, },
{ { "t0", "B" }, }
},
};
@@ -390,7 +390,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
{ "D", { "t1", "x1", } }, { "E", { "t1", } },
{ "F", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 3 }, { "t1", "L1", 2 }, },
{ { "t0", "B" }, { "t1", "E" } }
},
@@ -410,7 +410,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
{ "D", { "t1", "t2", } }, { "E", { "t1", "t3", } },
{ "F", { "t1", "t2", "t3", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 3 }, { "t1", "L1", 2 }, },
{ { "t0", "B" }, { "t1", "E" } }
},
};
@@ -441,7 +441,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
{ "E", { "t0", } },
{ "F", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 3 }, },
{},
},
// One RF=7 tablet with the distribution of its replica placement violating
@@ -467,7 +467,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
{ "G", { "t0", } },
{ "H", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 4 }, },
{},
},
{
@@ -485,7 +485,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
{ "D", { "t0", } }, { "E", { "t0", } },
{ "F", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 3 }, },
{}
},
};
@@ -525,7 +525,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "C", { "t1", } },
{ "D", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L0" }, },
+ { { "t0", "L0", 2 }, { "t1", "L0", 4 }, },
{}
},
{
@@ -541,7 +541,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "A", { "t0", } }, { "B", { "t0", } },
{ "D", { "t1", } }, { "E", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
{}
},
{
@@ -558,7 +558,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "A", { "t0", "t1", } }, { "B", { "t0", "t1", } },
{ "D", { "t1", } }, { "E", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
{}
},
{
@@ -574,7 +574,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "A", { "t0", } }, { "B", { "t0", } },
{ "C", { "t1", } }, { "D", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
{}
},
{
@@ -592,7 +592,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "D", { "t0", } },
{ "F", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 2 }, },
{}
},
};
@@ -616,7 +616,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
{ "D", { "t0", } }, { "F", { "t0", } },
{ "H", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 3 }, },
{ { "t0", "B" }, }
},
{
@@ -635,7 +635,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
{ "G", { "t0", } },
{ "H", { "t0", } },
},
- { { "t0", "L1" }, },
+ { { "t0", "L1", 4 }, },
{ { "t0", "D" }, }
},
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/tools/placement_policy_util.cc
index be1b502..f5ab790 100644
--- a/src/kudu/tools/placement_policy_util.cc
+++ b/src/kudu/tools/placement_policy_util.cc
@@ -333,7 +333,7 @@ Status DetectPlacementPolicyViolations(
tablet_id, max_replicas_num, rep_factor, max_replicas_location);
}
if (is_policy_violated) {
- info.push_back({ tablet_id, max_replicas_location });
+ info.push_back({ tablet_id, max_replicas_location, max_replicas_num });
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.h b/src/kudu/tools/placement_policy_util.h
index e54848d..2938d17 100644
--- a/src/kudu/tools/placement_policy_util.h
+++ b/src/kudu/tools/placement_policy_util.h
@@ -86,7 +86,6 @@ Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
struct PlacementPolicyViolationInfo {
std::string tablet_id;
std::string majority_location;
- int replication_factor;
int replicas_num_at_majority_location;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index 4d2d769..46f21f6 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -57,6 +57,7 @@ using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using std::accumulate;
using std::endl;
+using std::back_inserter;
using std::inserter;
using std::ostream;
using std::map;
@@ -69,6 +70,7 @@ using std::shared_ptr;
using std::sort;
using std::string;
using std::to_string;
+using std::transform;
using std::unordered_map;
using std::unordered_set;
using std::vector;
@@ -105,7 +107,7 @@ Rebalancer::Rebalancer(const Config& config)
: config_(config) {
}
-Status Rebalancer::PrintStats(std::ostream& out) {
+Status Rebalancer::PrintStats(ostream& out) {
// First, report on the current balance state of the cluster.
RETURN_NOT_OK(RefreshKsckResults());
const KsckResults& results = ksck_->results();
@@ -116,103 +118,49 @@ Status Rebalancer::PrintStats(std::ostream& out) {
ClusterInfo ci;
RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
- // Per-server replica distribution stats.
- {
- out << "Per-server replica distribution summary:" << endl;
- DataTable summary({"Statistic", "Value"});
-
- const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
- if (servers_load_info.empty()) {
- summary.AddRow({ "N/A", "N/A" });
- } else {
- const int64_t total_replica_count = accumulate(
- servers_load_info.begin(), servers_load_info.end(), 0L,
- [](int64_t sum, const pair<int32_t, string>& elem) {
- return sum + elem.first;
- });
-
- const auto min_replica_count = servers_load_info.begin()->first;
- const auto max_replica_count = servers_load_info.rbegin()->first;
- const double avg_replica_count =
- 1.0 * total_replica_count / servers_load_info.size();
-
- summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
- summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
- summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
- }
- RETURN_NOT_OK(summary.PrintTo(out));
- out << endl;
-
- if (config_.output_replica_distribution_details) {
- const auto& tserver_summaries = results.tserver_summaries;
- unordered_map<string, string> tserver_endpoints;
- for (const auto& summary : tserver_summaries) {
- tserver_endpoints.emplace(summary.uuid, summary.address);
- }
-
- out << "Per-server replica distribution details:" << endl;
- DataTable servers_info({ "UUID", "Address", "Replica Count" });
- for (const auto& elem : servers_load_info) {
- const auto& id = elem.second;
- servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
- }
- RETURN_NOT_OK(servers_info.PrintTo(out));
- out << endl;
- }
+ const auto& ts_id_by_location = ci.locality.servers_by_location;
+ if (ts_id_by_location.empty()) {
+ // Nothing to report about: there are no tablet servers reported.
+ out << "an empty cluster" << endl;
+ return Status::OK();
}
- // Per-table replica distribution stats.
- {
- out << "Per-table replica distribution summary:" << endl;
- DataTable summary({ "Replica Skew", "Value" });
- const auto& table_skew_info = ci.balance.table_info_by_skew;
- if (table_skew_info.empty()) {
- summary.AddRow({ "N/A", "N/A" });
- } else {
- const auto min_table_skew = table_skew_info.begin()->first;
- const auto max_table_skew = table_skew_info.rbegin()->first;
- const int64_t sum_table_skew = accumulate(
- table_skew_info.begin(), table_skew_info.end(), 0L,
- [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
- return sum + elem.first;
- });
- double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
-
- summary.AddRow({ "Minimum", to_string(min_table_skew) });
- summary.AddRow({ "Maximum", to_string(max_table_skew) });
- summary.AddRow({ "Average", to_string(avg_table_skew) });
- }
- RETURN_NOT_OK(summary.PrintTo(out));
- out << endl;
+ if (ts_id_by_location.size() == 1) {
+ // That's about printing information about the whole cluster.
+ return PrintLocationBalanceStats(ts_id_by_location.begin()->first,
+ raw_info, ci, out);
+ }
- if (config_.output_replica_distribution_details) {
- const auto& table_summaries = results.table_summaries;
- unordered_map<string, const KsckTableSummary*> table_info;
- for (const auto& summary : table_summaries) {
- table_info.emplace(summary.id, &summary);
- }
- out << "Per-table replica distribution details:" << endl;
- DataTable skew(
- { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
- for (const auto& elem : table_skew_info) {
- const auto& table_id = elem.second.table_id;
- const auto it = table_info.find(table_id);
- const auto* table_summary =
- (it == table_info.end()) ? nullptr : it->second;
- const auto& table_name = table_summary ? table_summary->name : "";
- const auto total_replica_count = table_summary
- ? table_summary->replication_factor * table_summary->TotalTablets()
- : 0;
- skew.AddRow({ table_id,
- to_string(total_replica_count),
- to_string(elem.first),
- table_name });
- }
- RETURN_NOT_OK(skew.PrintTo(out));
- out << endl;
- }
+ // The stats are more detailed in the case of a multi-location cluster.
+ DCHECK_GT(ts_id_by_location.size(), 1);
+
+ // 1. Print information about cross-location balance.
+ RETURN_NOT_OK(PrintCrossLocationBalanceStats(ci, out));
+
+ // 2. Iterating over locations in the cluster, print per-location balance
+ // information. Since the ts_id_by_location is not sorted, let's first
+ // create a sorted list of locations so the ouput would be sorted by
+ // location.
+ vector<string> locations;
+ locations.reserve(ts_id_by_location.size());
+ transform(ts_id_by_location.cbegin(), ts_id_by_location.cend(),
+ back_inserter(locations),
+ [](const unordered_map<string, set<string>>::value_type& elem) {
+ return elem.first;
+ });
+ sort(locations.begin(), locations.end());
+
+ for (const auto& location : locations) {
+ ClusterRawInfo raw_info;
+ RETURN_NOT_OK(KsckResultsToClusterRawInfo(location, results, &raw_info));
+ ClusterInfo ci;
+ RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+ RETURN_NOT_OK(PrintLocationBalanceStats(location, raw_info, ci, out));
}
+ // 3. Print information about placement policy violations.
+ RETURN_NOT_OK(PrintPolicyViolationInfo(raw_info, out));
+
return Status::OK();
}
@@ -538,6 +486,194 @@ Status Rebalancer::FilterCrossLocationTabletCandidates(
return Status::OK();
}
+Status Rebalancer::PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+ ostream& out) const {
+ // Print location load information.
+ map<string, int64_t> replicas_num_by_location;
+ for (const auto& elem : ci.balance.servers_by_total_replica_count) {
+ const auto& location = FindOrDie(ci.locality.location_by_ts_id, elem.second);
+ LookupOrEmplace(&replicas_num_by_location, location, 0) += elem.first;
+ }
+ out << "Locations load summary:" << endl;
+ DataTable location_load_summary({"Location", "Load"});
+ for (const auto& elem : replicas_num_by_location) {
+ const auto& location = elem.first;
+ const auto servers_num =
+ FindOrDie(ci.locality.servers_by_location, location).size();
+ CHECK_GT(servers_num, 0);
+ double location_load = static_cast<double>(elem.second) / servers_num;
+ location_load_summary.AddRow({ location, to_string(location_load) });
+ }
+ RETURN_NOT_OK(location_load_summary.PrintTo(out));
+ out << endl;
+
+ return Status::OK();
+}
+
+Status Rebalancer::PrintLocationBalanceStats(const string& location,
+ const ClusterRawInfo& raw_info,
+ const ClusterInfo& ci,
+ ostream& out) const {
+ if (!location.empty()) {
+ out << "--------------------------------------------------" << endl;
+ out << "Location: " << location << endl;
+ out << "--------------------------------------------------" << endl;
+ }
+
+ // Per-server replica distribution stats.
+ {
+ out << "Per-server replica distribution summary:" << endl;
+ DataTable summary({"Statistic", "Value"});
+
+ const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
+ if (servers_load_info.empty()) {
+ summary.AddRow({ "N/A", "N/A" });
+ } else {
+ const int64_t total_replica_count = accumulate(
+ servers_load_info.begin(), servers_load_info.end(), 0L,
+ [](int64_t sum, const pair<int32_t, string>& elem) {
+ return sum + elem.first;
+ });
+
+ const auto min_replica_count = servers_load_info.begin()->first;
+ const auto max_replica_count = servers_load_info.rbegin()->first;
+ const double avg_replica_count =
+ 1.0 * total_replica_count / servers_load_info.size();
+
+ summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
+ summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
+ summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
+ }
+ RETURN_NOT_OK(summary.PrintTo(out));
+ out << endl;
+
+ if (config_.output_replica_distribution_details) {
+ const auto& tserver_summaries = raw_info.tserver_summaries;
+ unordered_map<string, string> tserver_endpoints;
+ for (const auto& summary : tserver_summaries) {
+ tserver_endpoints.emplace(summary.uuid, summary.address);
+ }
+
+ out << "Per-server replica distribution details:" << endl;
+ DataTable servers_info({ "UUID", "Address", "Replica Count" });
+ for (const auto& elem : servers_load_info) {
+ const auto& id = elem.second;
+ servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
+ }
+ RETURN_NOT_OK(servers_info.PrintTo(out));
+ out << endl;
+ }
+ }
+
+ // Per-table replica distribution stats.
+ {
+ out << "Per-table replica distribution summary:" << endl;
+ DataTable summary({ "Replica Skew", "Value" });
+ const auto& table_skew_info = ci.balance.table_info_by_skew;
+ if (table_skew_info.empty()) {
+ summary.AddRow({ "N/A", "N/A" });
+ } else {
+ const auto min_table_skew = table_skew_info.begin()->first;
+ const auto max_table_skew = table_skew_info.rbegin()->first;
+ const int64_t sum_table_skew = accumulate(
+ table_skew_info.begin(), table_skew_info.end(), 0L,
+ [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
+ return sum + elem.first;
+ });
+ double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
+
+ summary.AddRow({ "Minimum", to_string(min_table_skew) });
+ summary.AddRow({ "Maximum", to_string(max_table_skew) });
+ summary.AddRow({ "Average", to_string(avg_table_skew) });
+ }
+ RETURN_NOT_OK(summary.PrintTo(out));
+ out << endl;
+
+ if (config_.output_replica_distribution_details) {
+ const auto& table_summaries = raw_info.table_summaries;
+ unordered_map<string, const KsckTableSummary*> table_info;
+ for (const auto& summary : table_summaries) {
+ table_info.emplace(summary.id, &summary);
+ }
+ out << "Per-table replica distribution details:" << endl;
+ DataTable skew(
+ { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
+ for (const auto& elem : table_skew_info) {
+ const auto& table_id = elem.second.table_id;
+ const auto it = table_info.find(table_id);
+ const auto* table_summary =
+ (it == table_info.end()) ? nullptr : it->second;
+ const auto& table_name = table_summary ? table_summary->name : "";
+ const auto total_replica_count = table_summary
+ ? table_summary->replication_factor * table_summary->TotalTablets()
+ : 0;
+ skew.AddRow({ table_id,
+ to_string(total_replica_count),
+ to_string(elem.first),
+ table_name });
+ }
+ RETURN_NOT_OK(skew.PrintTo(out));
+ out << endl;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status Rebalancer::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+ ostream& out) const {
+ TabletsPlacementInfo placement_info;
+ RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info));
+ vector<PlacementPolicyViolationInfo> ppvi;
+ RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
+ out << "Placement policy violations:" << endl;
+ if (ppvi.empty()) {
+ out << " none" << endl << endl;;
+ return Status::OK();
+ }
+
+ if (config_.output_replica_distribution_details) {
+ DataTable stats(
+ { "Location", "Table Name", "Tablet", "RF", "Replicas at location" });
+ for (const auto& info : ppvi) {
+ const auto& table_id = FindOrDie(placement_info.tablet_to_table_id,
+ info.tablet_id);
+ const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
+ stats.AddRow({ info.majority_location,
+ table_info.name,
+ info.tablet_id,
+ to_string(table_info.replication_factor),
+ to_string(info.replicas_num_at_majority_location) });
+ }
+ RETURN_NOT_OK(stats.PrintTo(out));
+ } else {
+ DataTable summary({ "Location",
+ "Number of non-complying tables",
+ "Number of non-complying tablets" });
+ typedef pair<unordered_set<string>, unordered_set<string>> TableTabletIds;
+ // Location --> sets of identifiers of tables and tablets hosted by the
+ // tablet servers at the location. The summary is sorted by location.
+ map<string, TableTabletIds> info_by_location;
+ for (const auto& info : ppvi) {
+ const auto& table_id = FindOrDie(placement_info.tablet_to_table_id,
+ info.tablet_id);
+ auto& elem = LookupOrEmplace(&info_by_location,
+ info.majority_location, TableTabletIds());
+ elem.first.emplace(table_id);
+ elem.second.emplace(info.tablet_id);
+ }
+ for (const auto& elem : info_by_location) {
+ summary.AddRow({ elem.first,
+ to_string(elem.second.first.size()),
+ to_string(elem.second.second.size()) });
+ }
+ RETURN_NOT_OK(summary.PrintTo(out));
+ }
+ out << endl;
+
+ return Status::OK();
+}
+
Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
const MovesInProgress& moves_in_progress,
ClusterInfo* info) const {
@@ -1084,11 +1220,11 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
return Status::OK();
}
unordered_set<string> tablets_in_move;
- std::transform(scheduled_moves_.begin(), scheduled_moves_.end(),
- inserter(tablets_in_move, tablets_in_move.begin()),
- [](const MovesInProgress::value_type& elem) {
- return elem.first;
- });
+ transform(scheduled_moves_.begin(), scheduled_moves_.end(),
+ inserter(tablets_in_move, tablets_in_move.begin()),
+ [](const MovesInProgress::value_type& elem) {
+ return elem.first;
+ });
for (const auto& move : moves) {
vector<string> tablet_ids;
RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index 7bb0d73..cbaef49 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -442,6 +442,21 @@ class Rebalancer {
const TableReplicaMove& move,
std::vector<std::string>* tablet_ids);
+ // Print information on the cross-location balance.
+ Status PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+ std::ostream& out) const;
+
+ // Print statistics for the specified location. If 'location' is an empty
+ // string, that's about printing the cluster-wide stats for a cluster that
+ // doesn't have any locations defined.
+ Status PrintLocationBalanceStats(const std::string& location,
+ const ClusterRawInfo& raw_info,
+ const ClusterInfo& ci,
+ std::ostream& out) const;
+
+ Status PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+ std::ostream& out) const;
+
// Convert the 'raw' information about the cluster into information suitable
// for the input of the high-level rebalancing algorithm.
// The 'moves_in_progress' parameter contains information on the replica moves
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer_tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 60e9f45..9fcd997 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -25,6 +25,7 @@
#include <string>
#include <thread>
#include <unordered_map>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -88,6 +89,7 @@ using std::thread;
using std::tuple;
using std::unique_ptr;
using std::unordered_map;
+using std::unordered_set;
using std::vector;
using strings::Substitute;
@@ -207,28 +209,14 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
ASSERT_STR_MATCHES(err, err_msg_pattern);
}
-// Create tables with unbalanced replica distribution: useful in
-// rebalancer-related tests.
-static Status CreateUnbalancedTables(
+static Status CreateTables(
cluster::ExternalMiniCluster* cluster,
client::KuduClient* client,
const Schema& table_schema,
const string& table_name_pattern,
int num_tables,
int rep_factor,
- int tserver_idx_from,
- int tserver_num,
- int tserver_unresponsive_ms,
vector<string>* table_names = nullptr) {
- // Keep running only some tablet servers and shut down the rest.
- for (auto i = tserver_idx_from; i < tserver_num; ++i) {
- cluster->tablet_server(i)->Shutdown();
- }
-
- // Wait for the catalog manager to understand that not all tablet servers
- // are available.
- SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
-
// Create tables with their tablet replicas landing only on the tablet servers
// which are up and running.
auto client_schema = KuduSchema::FromSchema(table_schema);
@@ -253,6 +241,32 @@ static Status CreateUnbalancedTables(
}
}
+ return Status::OK();
+}
+
+// Create tables with unbalanced replica distribution: useful in
+// rebalancer-related tests.
+static Status CreateUnbalancedTables(
+ cluster::ExternalMiniCluster* cluster,
+ client::KuduClient* client,
+ const Schema& table_schema,
+ const string& table_name_pattern,
+ int num_tables,
+ int rep_factor,
+ int tserver_idx_from,
+ int tserver_num,
+ int tserver_unresponsive_ms,
+ vector<string>* table_names = nullptr) {
+ // Keep running only some tablet servers and shut down the rest.
+ for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+ cluster->tablet_server(i)->Shutdown();
+ }
+
+ // Wait for the catalog manager to understand that not all tablet servers
+ // are available.
+ SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
+ RETURN_NOT_OK(CreateTables(cluster, client, table_schema, table_name_pattern,
+ num_tables, rep_factor, table_names));
for (auto i = tserver_idx_from; i < tserver_num; ++i) {
RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
}
@@ -404,9 +418,13 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
static const char* const kExitOnSignalStr;
static const char* const kTableNamePattern;
+ // Working around limitations of older libstdc++.
+ static const unordered_set<string> kEmptySet;
+
void Prepare(const vector<string>& extra_tserver_flags = {},
const vector<string>& extra_master_flags = {},
const LocationInfo& location_info = {},
+ const unordered_set<string>& empty_locations = kEmptySet,
vector<string>* created_tables_names = nullptr) {
const auto& scheme_flag = Substitute(
"--raft_prepare_replacement_before_eviction=$0", is_343_scheme());
@@ -420,12 +438,60 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
FLAGS_num_tablet_servers = num_tservers_;
FLAGS_num_replicas = rep_factor_;
- NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info));
+ NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info,
+ /*create_table=*/ false));
+
+ if (location_info.empty()) {
+ ASSERT_OK(CreateUnbalancedTables(
+ cluster_.get(), client_.get(), schema_, kTableNamePattern,
+ num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
+ tserver_unresponsive_ms_, created_tables_names));
+ } else {
+ ASSERT_OK(CreateTablesExcludingLocations(empty_locations,
+ created_tables_names));
+ }
+ }
+
+ // Create tables placing their tablet replicas everywhere but not at the
+ // tablet servers in the specified locations. This is similar to
+ // CreateUnbalancedTables() but the set of tablet servers to avoid is defined
+ // by the set of the specified locations.
+ Status CreateTablesExcludingLocations(
+ const unordered_set<string>& excluded_locations,
+ vector<string>* table_names = nullptr) {
+ // Shutdown all tablet servers in the specified locations so no tablet
+ // replicas would be hosted by those servers.
+ unordered_set<string> seen_locations;
+ if (!excluded_locations.empty()) {
+ for (const auto& elem : tablet_servers_) {
+ auto* ts = elem.second;
+ if (ContainsKey(excluded_locations, ts->location)) {
+ cluster_->tablet_server_by_uuid(ts->uuid())->Shutdown();
+ EmplaceIfNotPresent(&seen_locations, ts->location);
+ }
+ }
+ }
+ // Sanity check: every specified location should have been seen, otherwise
+ // something is wrong with the tablet servers' registration.
+ CHECK_EQ(excluded_locations.size(), seen_locations.size());
+
+ // Wait for the catalog manager to understand that not all tablet servers
+ // are available.
+ SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms_ / 4));
+ RETURN_NOT_OK(CreateTables(cluster_.get(), client_.get(), schema_,
+ kTableNamePattern, num_tables_, rep_factor_,
+ table_names));
+ // Start tablet servers at the excluded locations.
+ if (!excluded_locations.empty()) {
+ for (const auto& elem : tablet_servers_) {
+ auto* ts = elem.second;
+ if (ContainsKey(excluded_locations, ts->location)) {
+ RETURN_NOT_OK(cluster_->tablet_server_by_uuid(ts->uuid())->Restart());
+ }
+ }
+ }
- ASSERT_OK(CreateUnbalancedTables(
- cluster_.get(), client_.get(), schema_, kTableNamePattern,
- num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
- tserver_unresponsive_ms_, created_tables_names));
+ return Status::OK();
}
// When the rebalancer starts moving replicas, ksck detects corruption
@@ -454,6 +520,7 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
};
const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal";
const char* const RebalancingTest::kTableNamePattern = "rebalance_test_table_$0";
+const unordered_set<string> RebalancingTest::kEmptySet = unordered_set<string>();
typedef testing::WithParamInterface<Kudu1097> Kudu1097ParamTest;
@@ -1185,7 +1252,7 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
const LocationInfo location_info = { { "/A", 2 }, { "/B", 2 }, { "/C", 2 }, };
vector<string> table_names;
- NO_FATALS(Prepare({}, {}, location_info, &table_names));
+ NO_FATALS(Prepare({}, {}, location_info, kEmptySet, &table_names));
const vector<string> tool_args = {
"cluster",
@@ -1261,5 +1328,104 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
}
}
+class LocationAwareBalanceInfoTest : public RebalancingTest {
+ public:
+ LocationAwareBalanceInfoTest()
+ : RebalancingTest(/*num_tables=*/ 1,
+ /*rep_factor=*/ 3,
+ /*num_tservers=*/ 5) {
+ }
+
+ bool is_343_scheme() const override {
+ // These tests are for the 3-4-3 replica management scheme only.
+ return true;
+ }
+};
+
+// Verify the output of the location-aware rebalancer against a cluster
+// that has multiple locations.
+TEST_F(LocationAwareBalanceInfoTest, ReportOnly) {
+ static const char kReferenceOutput[] =
+ R"***(Locations load summary:
+ Location | Load
+----------+----------
+ /A | 3.000000
+ /B | 3.000000
+ /C | 0.000000
+
+--------------------------------------------------
+Location: /A
+--------------------------------------------------
+Per-server replica distribution summary:
+ Statistic | Value
+-----------------------+----------
+ Minimum Replica Count | 3
+ Maximum Replica Count | 3
+ Average Replica Count | 3.000000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+----------
+ Minimum | 0
+ Maximum | 0
+ Average | 0.000000
+
+--------------------------------------------------
+Location: /B
+--------------------------------------------------
+Per-server replica distribution summary:
+ Statistic | Value
+-----------------------+----------
+ Minimum Replica Count | 3
+ Maximum Replica Count | 3
+ Average Replica Count | 3.000000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+----------
+ Minimum | 0
+ Maximum | 0
+ Average | 0.000000
+
+--------------------------------------------------
+Location: /C
+--------------------------------------------------
+Per-server replica distribution summary:
+ Statistic | Value
+-----------------------+----------
+ Minimum Replica Count | 0
+ Maximum Replica Count | 0
+ Average Replica Count | 0.000000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+-------
+ N/A | N/A
+
+Placement policy violations:
+ Location | Number of non-complying tables | Number of non-complying tablets
+----------+--------------------------------+---------------------------------
+ /B | 1 | 3
+)***";
+
+ const LocationInfo location_info = { { "/A", 1 }, { "/B", 2 }, { "/C", 2 }, };
+ NO_FATALS(Prepare({}, {}, location_info, { "/C" }));
+
+ string out;
+ string err;
+ Status s = RunKuduTool({
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--report_only",
+ }, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ // The output should match the reference report.
+ ASSERT_STR_CONTAINS(out, kReferenceOutput);
+ // The actual rebalancing should not run.
+ ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:")
+ << ToolRunInfo(s, out, err);
+}
+
} // namespace tools
} // namespace kudu