You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/04/04 18:22:23 UTC
[kudu] branch master updated: [tools] range rebalancing for 'kudu cluster rebalance'
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 01ca8c3f2 [tools] range rebalancing for 'kudu cluster rebalance'
01ca8c3f2 is described below
commit 01ca8c3f2bb38bc8b738d86d3d88aa8d1ffaad87
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Mar 9 14:57:45 2022 -0800
[tools] range rebalancing for 'kudu cluster rebalance'
This patch adds range rebalancing functionality into the
'kudu cluster rebalance' CLI tool. The implementation is rather an
MVP: the range rebalancing can now be performed only for a single
table per run. As far as I can see, there is a room for improvement
since it's possible to perform range-aware replica movements even
during standard whole cluster rebalancing.
Below is two snapshots of distribution of the range-specific tablet
replicas in a cluster. Those are produced by running the tool with
extra --report_only --output_replica_distribution_details flags
before and after range rebalancing for a single table:
kudu cluster rebalance \
--enable_range_rebalancing \
--tables=default.loadgen_auto_6800f4ec4e164b2b8e42db7b5044df09 \
127.0.0.1:8765
before:
========================================================================
Table: abb2bbf8b4ff4bc0989bc82c78d4ae2b
Number of tablet replicas at servers for each range
Max Skew | Total Count | Range Start Key
----------+-------------+----------------------
8 | 8 |
8 | 8 | ff80000000000001fff4
8 | 8 | ff80000000000003ffe8
8 | 8 | ff80000000000005ffdc
Range start key: ''
UUID | Server address | Replica Count
----------------------------------+----------------+---------------
15a8d0fef42c4da2bd5d9e1c5a2de301 | 127.0.0.1:9870 | 8
3243029e0db04680a2653c6acc048813 | 127.0.0.1:9876 | 0
324ef10666b14ab9bb61e775fa351ad6 | 127.0.0.1:9872 | 0
a5c6f822f5cc4645bbb4a14874e311d4 | 127.0.0.1:9874 | 0
Range start key: 'ff80000000000001fff4'
UUID | Server address | Replica Count
----------------------------------+----------------+---------------
15a8d0fef42c4da2bd5d9e1c5a2de301 | 127.0.0.1:9870 | 0
3243029e0db04680a2653c6acc048813 | 127.0.0.1:9876 | 8
324ef10666b14ab9bb61e775fa351ad6 | 127.0.0.1:9872 | 0
a5c6f822f5cc4645bbb4a14874e311d4 | 127.0.0.1:9874 | 0
Range start key: 'ff80000000000003ffe8'
UUID | Server address | Replica Count
----------------------------------+----------------+---------------
15a8d0fef42c4da2bd5d9e1c5a2de301 | 127.0.0.1:9870 | 0
3243029e0db04680a2653c6acc048813 | 127.0.0.1:9876 | 0
324ef10666b14ab9bb61e775fa351ad6 | 127.0.0.1:9872 | 8
a5c6f822f5cc4645bbb4a14874e311d4 | 127.0.0.1:9874 | 0
Range start key: 'ff80000000000005ffdc'
UUID | Server address | Replica Count
----------------------------------+----------------+---------------
15a8d0fef42c4da2bd5d9e1c5a2de301 | 127.0.0.1:9870 | 0
3243029e0db04680a2653c6acc048813 | 127.0.0.1:9876 | 0
324ef10666b14ab9bb61e775fa351ad6 | 127.0.0.1:9872 | 0
a5c6f822f5cc4645bbb4a14874e311d4 | 127.0.0.1:9874 | 8
after:
========================================================================
Table: abb2bbf8b4ff4bc0989bc82c78d4ae2b
Number of tablet replicas at servers for each range
Max Skew | Total Count | Range Start Key
----------+-------------+----------------------
0 | 8 |
0 | 8 | ff80000000000001fff4
0 | 8 | ff80000000000003ffe8
0 | 8 | ff80000000000005ffdc
Range start key: ''
UUID | Server address | Replica Count
----------------------------------+----------------+---------------
15a8d0fef42c4da2bd5d9e1c5a2de301 | 127.0.0.1:9870 | 2
3243029e0db04680a2653c6acc048813 | 127.0.0.1:9876 | 2
324ef10666b14ab9bb61e775fa351ad6 | 127.0.0.1:9872 | 2
a5c6f822f5cc4645bbb4a14874e311d4 | 127.0.0.1:9874 | 2
Range start key: 'ff80000000000001fff4'
UUID | Server address | Replica Count
----------------------------------+----------------+---------------
15a8d0fef42c4da2bd5d9e1c5a2de301 | 127.0.0.1:9870 | 2
3243029e0db04680a2653c6acc048813 | 127.0.0.1:9876 | 2
324ef10666b14ab9bb61e775fa351ad6 | 127.0.0.1:9872 | 2
a5c6f822f5cc4645bbb4a14874e311d4 | 127.0.0.1:9874 | 2
Range start key: 'ff80000000000003ffe8'
UUID | Server address | Replica Count
----------------------------------+----------------+---------------
15a8d0fef42c4da2bd5d9e1c5a2de301 | 127.0.0.1:9870 | 2
3243029e0db04680a2653c6acc048813 | 127.0.0.1:9876 | 2
324ef10666b14ab9bb61e775fa351ad6 | 127.0.0.1:9872 | 2
a5c6f822f5cc4645bbb4a14874e311d4 | 127.0.0.1:9874 | 2
Range start key: 'ff80000000000005ffdc'
UUID | Server address | Replica Count
----------------------------------+----------------+---------------
15a8d0fef42c4da2bd5d9e1c5a2de301 | 127.0.0.1:9870 | 2
3243029e0db04680a2653c6acc048813 | 127.0.0.1:9876 | 2
324ef10666b14ab9bb61e775fa351ad6 | 127.0.0.1:9872 | 2
a5c6f822f5cc4645bbb4a14874e311d4 | 127.0.0.1:9874 | 2
Change-Id: I7d2e19266e993f5e2ae13ba18d323c83db30eac1
Reviewed-on: http://gerrit.cloudera.org:8080/18294
Tested-by: Alexey Serbin <al...@apache.org>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/master/auto_rebalancer.cc | 2 +-
src/kudu/rebalance/rebalance-test.cc | 268 +++++++---
src/kudu/rebalance/rebalance_algo-test.cc | 826 +++++++++++++++++++++++-------
src/kudu/rebalance/rebalance_algo.cc | 74 ++-
src/kudu/rebalance/rebalance_algo.h | 48 +-
src/kudu/rebalance/rebalancer.cc | 32 +-
src/kudu/rebalance/rebalancer.h | 12 +-
src/kudu/tools/rebalancer_tool-test.cc | 153 +++++-
src/kudu/tools/rebalancer_tool.cc | 134 ++++-
src/kudu/tools/tool_action_cluster.cc | 13 +-
10 files changed, 1233 insertions(+), 329 deletions(-)
diff --git a/src/kudu/master/auto_rebalancer.cc b/src/kudu/master/auto_rebalancer.cc
index 789d75f21..88e9bc568 100644
--- a/src/kudu/master/auto_rebalancer.cc
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -344,7 +344,7 @@ Status AutoRebalancerTask::GetMovesUsingRebalancingAlgo(
vector<Rebalancer::ReplicaMove> rep_moves;
for (const auto& move : moves) {
vector<string> tablet_ids;
- Rebalancer::FindReplicas(move, raw_info, &tablet_ids);
+ rebalancer_.FindReplicas(move, raw_info, &tablet_ids);
if (cross_location == CrossLocations::YES) {
// In case of cross-location (a.k.a. inter-location) rebalancing it is
// necessary to make sure the majority of replicas would not end up
diff --git a/src/kudu/rebalance/rebalance-test.cc b/src/kudu/rebalance/rebalance-test.cc
index 76afde6c3..94706d4a2 100644
--- a/src/kudu/rebalance/rebalance-test.cc
+++ b/src/kudu/rebalance/rebalance-test.cc
@@ -62,6 +62,7 @@ struct ServerHealthSummaryInput {
struct TabletSummaryInput {
std::string id;
std::string table_id;
+ std::string range_key_begin;
std::vector<ReplicaSummaryInput> replicas;
};
@@ -103,6 +104,7 @@ ClusterRawInfo GenerateRawClusterInfo(const KsckResultsInput& input) {
TabletSummary summary;
summary.id = summary_input.id;
summary.table_id = summary_input.table_id;
+ summary.range_key_begin = summary_input.range_key_begin;
auto& replicas = summary.replicas;
for (const auto& replica_input : summary_input.replicas) {
ReplicaSummary replica;
@@ -182,12 +184,13 @@ bool HasSameContents(const multimap<int32_t, T>& lhs, const multimap<int32_t, T>
bool operator==(const TableBalanceInfo& lhs, const TableBalanceInfo& rhs) {
return
lhs.table_id == rhs.table_id &&
+ lhs.tag == rhs.tag &&
HasSameContents(lhs.servers_by_replica_count,
rhs.servers_by_replica_count);
}
bool operator<(const TableBalanceInfo& lhs, const TableBalanceInfo& rhs) {
- return lhs.table_id < rhs.table_id;
+ return lhs.table_id + "." + lhs.tag < rhs.table_id + "." + rhs.tag;
}
bool operator==(const ClusterBalanceInfo& lhs, const ClusterBalanceInfo& rhs) {
@@ -205,7 +208,7 @@ ostream& operator<<(ostream& s, const ClusterBalanceInfo& info) {
s << " ]; [";
for (const auto& elem : info.table_info_by_skew) {
s << " " << elem.first << ":{ " << elem.second.table_id
- << " [";
+ << " '" << elem.second.tag << "' [";
for (const auto& e : elem.second.servers_by_replica_count) {
s << " " << e.first << ":" << e.second;
}
@@ -277,11 +280,11 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveRf1Replicas) {
{
{
{ { "ts_0" }, },
- { { "tablet_0", "table_a", { { "ts_0", true }, }, }, },
+ { { "tablet_0", "table_a", "", { { "ts_0", true }, }, }, },
{ { "table_a", 1 }, },
},
{
- { { 0, { "table_a", { { 1, "ts_0" }, } } }, },
+ { { 0, { "table_a", "", { { 1, "ts_0" }, } } }, },
{ { 1, "ts_0" }, },
}
},
@@ -290,15 +293,15 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveRf1Replicas) {
{
{ { "ts_0" }, { "ts_1" }, { "ts_2" }, },
{
- { "tablet_a0", "table_a", { { "ts_0", true }, }, },
- { "tablet_a0", "table_a", { { "ts_1", true }, }, },
- { "tablet_a0", "table_a", { { "ts_2", true }, }, },
+ { "tablet_a0", "table_a", "", { { "ts_0", true }, }, },
+ { "tablet_a0", "table_a", "", { { "ts_1", true }, }, },
+ { "tablet_a0", "table_a", "", { { "ts_2", true }, }, },
},
{ { "table_a", 3 } },
},
{
{
- { 0, { "table_a", {
+ { 0, { "table_a", "", {
{ 1, "ts_2" },
{ 1, "ts_1" },
{ 1, "ts_0" },
@@ -316,25 +319,25 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveRf1Replicas) {
{
{ { "ts_0" }, { "ts_1" }, { "ts_2" }, },
{
- { "tablet_a_0", "table_a", { { "ts_0", true }, }, },
- { "tablet_b_0", "table_b", { { "ts_0", true }, }, },
- { "tablet_c_0", "table_c", { { "ts_0", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_0", true }, }, },
+ { "tablet_b_0", "table_b", "", { { "ts_0", true }, }, },
+ { "tablet_c_0", "table_c", "", { { "ts_0", true }, }, },
},
{ { { "table_a", 1 }, { "table_b", 1 }, { "table_c", 1 }, } },
},
{
{
- { 1, { "table_c", {
+ { 1, { "table_c", "", {
{ 0, "ts_1" }, { 0, "ts_2" }, { 1, "ts_0" },
}
}
},
- { 1, { "table_b", {
+ { 1, { "table_b", "", {
{ 0, "ts_1" }, { 0, "ts_2" }, { 1, "ts_0" },
}
}
},
- { 1, { "table_a", {
+ { 1, { "table_a", "", {
{ 0, "ts_1" }, { 0, "ts_2" }, { 1, "ts_0" },
}
}
@@ -352,30 +355,30 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveRf1Replicas) {
{
{ { "ts_0" }, { "ts_1" }, { "ts_2" }, },
{
- { "tablet_a_0", "table_a", { { "ts_0", true }, }, },
- { "tablet_a_0", "table_a", { { "ts_1", true }, }, },
- { "tablet_a_0", "table_a", { { "ts_2", true }, }, },
- { "tablet_b_0", "table_b", { { "ts_0", true }, }, },
- { "tablet_b_1", "table_b", { { "ts_0", true }, }, },
- { "tablet_b_2", "table_b", { { "ts_0", true }, }, },
- { "tablet_c_0", "table_c", { { "ts_1", true }, }, },
- { "tablet_c_1", "table_c", { { "ts_1", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_0", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_1", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_2", true }, }, },
+ { "tablet_b_0", "table_b", "", { { "ts_0", true }, }, },
+ { "tablet_b_1", "table_b", "", { { "ts_0", true }, }, },
+ { "tablet_b_2", "table_b", "", { { "ts_0", true }, }, },
+ { "tablet_c_0", "table_c", "", { { "ts_1", true }, }, },
+ { "tablet_c_1", "table_c", "", { { "ts_1", true }, }, },
},
{ { { "table_a", 3 }, { "table_b", 1 }, { "table_c", 1 }, } },
},
{
{
- { 2, { "table_c", {
+ { 2, { "table_c", "", {
{ 0, "ts_0" }, { 0, "ts_2" }, { 2, "ts_1" },
}
}
},
- { 3, { "table_b", {
+ { 3, { "table_b", "", {
{ 0, "ts_1" }, { 0, "ts_2" }, { 3, "ts_0" },
}
}
},
- { 0, { "table_a", {
+ { 0, { "table_a", "", {
{ 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
}
}
@@ -415,7 +418,7 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
{
{
{ { "ts_0" }, },
- { { "tablet_0", "table_a", { { "ts_0", true }, }, }, },
+ { { "tablet_0", "table_a", "", { { "ts_0", true }, }, }, },
{ { "table_a", 1 }, },
},
{
@@ -428,9 +431,9 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
{
{ { "ts_0" }, { "ts_1" }, },
{
- { "tablet_a0", "table_a", { { "ts_0", true }, }, },
- { "tablet_b0", "table_b", { { "ts_0", true }, }, },
- { "tablet_b1", "table_b", { { "ts_1", true }, }, },
+ { "tablet_a0", "table_a", "", { { "ts_0", true }, }, },
+ { "tablet_b0", "table_b", "", { { "ts_0", true }, }, },
+ { "tablet_b1", "table_b", "", { { "ts_1", true }, }, },
},
{ { "table_a", 1 }, { "table_b", 1 } },
},
@@ -446,14 +449,14 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
{
{ { "ts_0" }, { "ts_1" }, { "ts_2" }, },
{
- { "tablet_a_0", "table_a", { { "ts_0", true }, }, },
- { "tablet_a_0", "table_a", { { "ts_1", true }, }, },
- { "tablet_a_0", "table_a", { { "ts_2", true }, }, },
- { "tablet_b_0", "table_b", { { "ts_0", true }, }, },
- { "tablet_b_1", "table_b", { { "ts_0", true }, }, },
- { "tablet_b_2", "table_b", { { "ts_0", true }, }, },
- { "tablet_c_0", "table_c", { { "ts_1", true }, }, },
- { "tablet_c_1", "table_c", { { "ts_1", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_0", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_1", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_2", true }, }, },
+ { "tablet_b_0", "table_b", "", { { "ts_0", true }, }, },
+ { "tablet_b_1", "table_b", "", { { "ts_0", true }, }, },
+ { "tablet_b_2", "table_b", "", { { "ts_0", true }, }, },
+ { "tablet_c_0", "table_c", "", { { "ts_1", true }, }, },
+ { "tablet_c_1", "table_c", "", { { "ts_1", true }, }, },
},
{ { { "table_a", 3 }, { "table_b", 1 }, { "table_c", 1 }, } },
},
@@ -461,7 +464,7 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
{
{
0, {
- "table_a", {
+ "table_a", "", {
{ 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
}
}
@@ -500,24 +503,24 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveIgnoredTserversReplicas) {
{
{ { "ts_0" }, { "ts_1" }, { "ts_2" }, { "ts_3" }, { "ts_4" }, },
{
- { "tablet_a_0", "table_a", { { "ts_0", true }, }, },
- { "tablet_a_0", "table_a", { { "ts_1", true }, }, },
- { "tablet_a_0", "table_a", { { "ts_2", true }, }, },
- { "tablet_b_0", "table_b", { { "ts_1", true }, }, },
- { "tablet_b_0", "table_b", { { "ts_2", true }, }, },
- { "tablet_b_0", "table_b", { { "ts_3", true }, }, },
- { "tablet_b_1", "table_b", { { "ts_2", true }, }, },
- { "tablet_b_1", "table_b", { { "ts_3", true }, }, },
- { "tablet_b_1", "table_b", { { "ts_4", true }, }, },
- { "tablet_c_0", "table_c", { { "ts_1", true }, }, },
- { "tablet_c_0", "table_c", { { "ts_2", true }, }, },
- { "tablet_c_0", "table_c", { { "ts_3", true }, }, },
- { "tablet_c_1", "table_c", { { "ts_1", true }, }, },
- { "tablet_c_1", "table_c", { { "ts_2", true }, }, },
- { "tablet_c_1", "table_c", { { "ts_3", true }, }, },
- { "tablet_c_2", "table_c", { { "ts_2", true }, }, },
- { "tablet_c_2", "table_c", { { "ts_3", true }, }, },
- { "tablet_c_2", "table_c", { { "ts_4", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_0", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_1", true }, }, },
+ { "tablet_a_0", "table_a", "", { { "ts_2", true }, }, },
+ { "tablet_b_0", "table_b", "", { { "ts_1", true }, }, },
+ { "tablet_b_0", "table_b", "", { { "ts_2", true }, }, },
+ { "tablet_b_0", "table_b", "", { { "ts_3", true }, }, },
+ { "tablet_b_1", "table_b", "", { { "ts_2", true }, }, },
+ { "tablet_b_1", "table_b", "", { { "ts_3", true }, }, },
+ { "tablet_b_1", "table_b", "", { { "ts_4", true }, }, },
+ { "tablet_c_0", "table_c", "", { { "ts_1", true }, }, },
+ { "tablet_c_0", "table_c", "", { { "ts_2", true }, }, },
+ { "tablet_c_0", "table_c", "", { { "ts_3", true }, }, },
+ { "tablet_c_1", "table_c", "", { { "ts_1", true }, }, },
+ { "tablet_c_1", "table_c", "", { { "ts_2", true }, }, },
+ { "tablet_c_1", "table_c", "", { { "ts_3", true }, }, },
+ { "tablet_c_2", "table_c", "", { { "ts_2", true }, }, },
+ { "tablet_c_2", "table_c", "", { { "ts_3", true }, }, },
+ { "tablet_c_2", "table_c", "", { { "ts_4", true }, }, },
},
{ { { "table_a", 3 }, { "table_b", 3 }, { "table_c", 3 }, } },
},
@@ -525,21 +528,21 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveIgnoredTserversReplicas) {
{
{
1, {
- "table_a", {
+ "table_a", "", {
{ 0, "ts_4" }, { 0, "ts_3" }, { 1, "ts_2" },
}
}
},
{
1, {
- "table_b", {
+ "table_b", "", {
{ 1, "ts_4" }, { 2, "ts_3" }, { 2, "ts_2" },
}
}
},
{
2, {
- "table_c", {
+ "table_c", "", {
{ 1, "ts_4" }, { 3, "ts_3" }, { 3, "ts_2" },
}
}
@@ -555,5 +558,152 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveIgnoredTserversReplicas) {
NO_FATALS(RunTest(rebalancer_config, test_configs));
}
+TEST_F(KsckResultsToClusterBalanceInfoTest, RangeRebalancingInfo) {
+ const vector<KsckResultsTestConfig> test_configs = {
+ // Empty
+ {
+ {},
+ {}
+ },
+ // One tserver, one table, one tablet, RF=1.
+ {
+ {
+ { { "ts_0" }, },
+ { { "tablet_0", "table_a", "x", { { "ts_0", true }, }, }, },
+ { { "table_a", 1 }, },
+ },
+ {
+ {},
+ { { 0, "ts_0" }, }
+ }
+ },
+ // table_a: 1 tablet with RF=3
+ // table_b: 3 tablets with RF=1
+ // table_c: 2 tablets with RF=1
+ {
+ {
+ { { "ts_0" }, { "ts_1" }, { "ts_2" }, },
+ {
+ { "tablet_a_0", "table_a", "x", { { "ts_0", true }, }, },
+ { "tablet_a_0", "table_a", "x", { { "ts_1", true }, }, },
+ { "tablet_a_0", "table_a", "x", { { "ts_2", true }, }, },
+ { "tablet_b_0", "table_b", "x", { { "ts_0", true }, }, },
+ { "tablet_b_1", "table_b", "x", { { "ts_0", true }, }, },
+ { "tablet_b_2", "table_b", "x", { { "ts_0", true }, }, },
+ { "tablet_c_0", "table_c", "x", { { "ts_1", true }, }, },
+ { "tablet_c_1", "table_c", "x", { { "ts_1", true }, }, },
+ },
+ { { { "table_a", 3 }, { "table_b", 1 }, { "table_c", 1 }, } },
+ },
+ {
+ {
+ {
+ 0, {
+ "table_a", "x", {
+ { 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
+ }
+ }
+ },
+ },
+ {
+ { 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
+ },
+ }
+ },
+ {
+ {
+ { { "ts_0" }, { "ts_1" }, { "ts_2" }, },
+ {
+ { "tablet_a_0", "table_a", "x", { { "ts_0", true }, }, },
+ { "tablet_a_0", "table_a", "x", { { "ts_1", true }, }, },
+ { "tablet_a_0", "table_a", "x", { { "ts_2", true }, }, },
+ { "tablet_b_0", "table_b", "y", { { "ts_0", true }, }, },
+ { "tablet_b_1", "table_b", "y", { { "ts_0", true }, }, },
+ { "tablet_b_2", "table_b", "y", { { "ts_0", true }, }, },
+ { "tablet_c_0", "table_c", "z", { { "ts_1", true }, }, },
+ { "tablet_c_1", "table_c", "z", { { "ts_1", true }, }, },
+ { "tablet_c_2", "table_c", "z", { { "ts_2", true }, }, },
+ },
+ { { { "table_a", 3 }, { "table_b", 3 }, { "table_c", 3 }, } },
+ },
+ {
+ {
+ {
+ 0, {
+ "table_a", "x", {
+ { 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
+ }
+ }
+ },
+ {
+ 3, {
+ "table_b", "y", {
+ { 0, "ts_2" }, { 0, "ts_1" }, { 3, "ts_0" },
+ }
+ }
+ },
+ {
+ 2, {
+ "table_c", "z", {
+ { 1, "ts_2" }, { 2, "ts_1" }, { 0, "ts_0" },
+ }
+ }
+ },
+ },
+ {
+ { 2, "ts_2" }, { 3, "ts_1" }, { 4, "ts_0" },
+ },
+ },
+ },
+ {
+ {
+ { { "ts_0" }, { "ts_1" }, { "ts_2" }, },
+ {
+ { "tablet_r0h0", "table_a", "0", { { "ts_0", true }, }, },
+ { "tablet_r1h0", "table_a", "1", { { "ts_1", true }, }, },
+ { "tablet_r2h0", "table_a", "2", { { "ts_2", true }, }, },
+ { "tablet_r0h1", "table_a", "0", { { "ts_0", true }, }, },
+ { "tablet_r1h1", "table_a", "1", { { "ts_0", true }, }, },
+ { "tablet_r2h1", "table_a", "2", { { "ts_0", true }, }, },
+ { "tablet_r0h2", "table_a", "0", { { "ts_1", true }, }, },
+ { "tablet_r1h2", "table_a", "1", { { "ts_1", true }, }, },
+ { "tablet_r2h2", "table_a", "2", { { "ts_1", true }, }, },
+ },
+ { { { "table_a", 3 }, } },
+ },
+ {
+ {
+ {
+ 2, {
+ "table_a", "0", {
+ { 0, "ts_2" }, { 1, "ts_1" }, { 2, "ts_0" },
+ }
+ }
+ },
+ {
+ 2, {
+ "table_a", "1", {
+ { 0, "ts_2" }, { 2, "ts_1" }, { 1, "ts_0" },
+ }
+ }
+ },
+ {
+ 0, {
+ "table_a", "2", {
+ { 1, "ts_2" }, { 1, "ts_1" }, { 1, "ts_0" },
+ }
+ }
+ },
+ },
+ {
+ { 1, "ts_2" }, { 4, "ts_1" }, { 4, "ts_0" },
+ },
+ }
+ },
+ };
+
+ NO_FATALS(RunTest(Rebalancer::Config(), test_configs));
+}
+
} // namespace rebalance
} // namespace kudu
diff --git a/src/kudu/rebalance/rebalance_algo-test.cc b/src/kudu/rebalance/rebalance_algo-test.cc
index adb9c5e5c..0b89bb363 100644
--- a/src/kudu/rebalance/rebalance_algo-test.cc
+++ b/src/kudu/rebalance/rebalance_algo-test.cc
@@ -67,6 +67,7 @@ struct TestClusterConfig;
using std::endl;
using std::ostream;
using std::ostringstream;
+using std::pair;
using std::set;
using std::sort;
using std::string;
@@ -79,6 +80,7 @@ namespace rebalance {
struct TablePerServerReplicas {
const string table_id;
+ const string tag;
// Number of replicas of this table on each server in the cluster.
// By definition, the indices in this container correspond to indices
@@ -131,12 +133,14 @@ struct TestClusterConfig {
bool operator==(const TableReplicaMove& lhs, const TableReplicaMove& rhs) {
return
lhs.table_id == rhs.table_id &&
+ lhs.tag == rhs.tag &&
lhs.from == rhs.from &&
lhs.to == rhs.to;
}
ostream& operator<<(ostream& o, const TableReplicaMove& move) {
- o << move.table_id << ":" << move.from << "->" << move.to;
+ o << move.table_id << " (" << move.tag << ") " << ":"
+ << move.from << "->" << move.to;
return o;
}
@@ -145,13 +149,14 @@ ostream& operator<<(ostream& o, const TableReplicaMove& move) {
void ClusterConfigToClusterInfo(const TestClusterConfig& tcc,
ClusterInfo* cluster_info) {
// First verify that the configuration of the test cluster is valid.
- set<string> table_ids;
+ set<pair<string, string>> table_ids_and_tags;
for (const auto& table_replica_info : tcc.table_replicas) {
CHECK_EQ(tcc.tserver_uuids.size(),
table_replica_info.num_replicas_by_server.size());
- table_ids.emplace(table_replica_info.table_id);
+ table_ids_and_tags.emplace(
+ table_replica_info.table_id, table_replica_info.tag);
}
- CHECK_EQ(table_ids.size(), tcc.table_replicas.size());
+ CHECK_EQ(table_ids_and_tags.size(), tcc.table_replicas.size());
{
// Check for uniqueness of the tablet servers' identifiers.
set<string> uuids(tcc.tserver_uuids.begin(), tcc.tserver_uuids.end());
@@ -178,6 +183,7 @@ void ClusterConfigToClusterInfo(const TestClusterConfig& tcc,
tcc.table_replicas[table_idx].num_replicas_by_server;
TableBalanceInfo info;
info.table_id = tcc.table_replicas[table_idx].table_id;
+ info.tag = tcc.table_replicas[table_idx].tag;
for (size_t tserver_idx = 0; tserver_idx < replicas_count.size(); ++tserver_idx) {
auto count = replicas_count[tserver_idx];
info.servers_by_replica_count.emplace(count, tcc.tserver_uuids[tserver_idx]);
@@ -240,6 +246,9 @@ void VerifyLocationRebalancingMoves(const TestClusterConfig& cfg) {
if (lhs.table_id != rhs.table_id) {
return lhs.table_id < rhs.table_id;
}
+ if (lhs.tag != rhs.tag) {
+ return lhs.tag < rhs.tag;
+ }
if (lhs.from != rhs.from) {
return lhs.from < rhs.from;
}
@@ -341,7 +350,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
kNoLocations,
{ "0", },
{
- { "A", { 1 } },
+ { "A", "", { 1 } },
},
},
{
@@ -349,9 +358,9 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
kNoLocations,
{ "0", },
{
- { "A", { 1 } },
- { "B", { 10 } },
- { "C", { 100 } },
+ { "A", "", { 1 } },
+ { "B", "", { 10 } },
+ { "C", "", { 100 } },
},
},
{
@@ -359,7 +368,7 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
kNoLocations,
{ "0", "1", },
{
- { "A", { 100, 99, } },
+ { "A", "", { 100, 99, } },
},
},
{
@@ -367,8 +376,8 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
kNoLocations,
{ "0", "1", },
{
- { "A", { 1, 1, } },
- { "B", { 1, 2, } },
+ { "A", "", { 1, 1, } },
+ { "B", "", { 1, 2, } },
},
},
{
@@ -378,19 +387,19 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
kNoLocations,
{ "0", "1", },
{
- { "A", { 1, 2, } },
- { "B", { 1, 2, } },
- { "C", { 1, 0, } },
- { "D", { 1, 0, } },
+ { "A", "", { 1, 2, } },
+ { "B", "", { 1, 2, } },
+ { "C", "", { 1, 0, } },
+ { "D", "", { 1, 0, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 0, 0, } },
- { "B", { 0, 1, 0, } },
- { "C", { 0, 0, 1, } },
+ { "A", "", { 1, 0, 0, } },
+ { "B", "", { 0, 1, 0, } },
+ { "C", "", { 0, 0, 1, } },
},
},
{
@@ -399,89 +408,274 @@ TEST(RebalanceAlgoUnitTest, AlreadyBalanced) {
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 1, 1, } },
- { "B", { 1, 1, 1, } },
- { "C", { 1, 1, 1, } },
+ { "A", "", { 1, 1, 1, } },
+ { "B", "", { 1, 1, 1, } },
+ { "C", "", { 1, 1, 1, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 0, 1, 1, } },
- { "B", { 1, 0, 1, } },
- { "C", { 1, 1, 0, } },
+ { "A", "", { 0, 1, 1, } },
+ { "B", "", { 1, 0, 1, } },
+ { "C", "", { 1, 1, 0, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 2, 1, 1, } },
- { "B", { 1, 2, 1, } },
- { "C", { 1, 1, 2, } },
+ { "A", "", { 2, 1, 1, } },
+ { "B", "", { 1, 2, 1, } },
+ { "C", "", { 1, 1, 2, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 1, 0, } },
- { "B", { 1, 1, 0, } },
- { "C", { 1, 0, 1, } },
- { "D", { 1, 0, 1, } },
- { "E", { 0, 1, 1, } },
- { "F", { 0, 1, 1, } },
+ { "A", "", { 1, 1, 0, } },
+ { "B", "", { 1, 1, 0, } },
+ { "C", "", { 1, 0, 1, } },
+ { "D", "", { 1, 0, 1, } },
+ { "E", "", { 0, 1, 1, } },
+ { "F", "", { 0, 1, 1, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 0, 1, } },
- { "B", { 1, 1, 0, } },
+ { "A", "", { 1, 0, 1, } },
+ { "B", "", { 1, 1, 0, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "B", { 1, 0, 1, } },
- { "A", { 1, 1, 0, } },
+ { "B", "", { 1, 0, 1, } },
+ { "A", "", { 1, 1, 0, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 2, 2, 1, } },
- { "B", { 1, 0, 1, } },
+ { "A", "", { 2, 2, 1, } },
+ { "B", "", { 1, 0, 1, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 2, 2, 1, } },
- { "B", { 1, 1, 1, } },
+ { "A", "", { 2, 2, 1, } },
+ { "B", "", { 1, 1, 1, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 2, 2, 1, } },
- { "B", { 0, 0, 1, } },
- { "C", { 0, 0, 1, } },
+ { "A", "", { 2, 2, 1, } },
+ { "B", "", { 0, 0, 1, } },
+ { "C", "", { 0, 0, 1, } },
},
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 0, 1, 0, } },
- { "B", { 1, 0, 1, } },
- { "C", { 1, 0, 1, } },
+ { "A", "", { 0, 1, 0, } },
+ { "B", "", { 1, 0, 1, } },
+ { "C", "", { 1, 0, 1, } },
+ },
+ },
+ };
+ VERIFY_MOVES(kConfigs);
+}
+
+// Various scenarios of balanced configurations where no moves are expected
+// to happen, exercising range balancing.
+TEST(RebalanceAlgoUnitTest, AlreadyBalancedRanges) {
+ // The configurations are already balanced, no moves should be attempted.
+ const TestClusterConfig kConfigs[] = {
+ {
+ // A single tablet server with a single replica of the only table.
+ kNoLocations,
+ { "0", },
+ {
+ { "A", "R0", { 1 } },
+ },
+ },
+ {
+ // A single tablet server in the cluster that hosts all replicas.
+ kNoLocations,
+ { "0", },
+ {
+ { "A", "R0", { 1 } },
+ { "B", "R0", { 10 } },
+ { "C", "R0", { 100 } },
+ },
+ },
+ {
+ // A single tablet server in the cluster that hosts all replicas.
+ kNoLocations,
+ { "0", },
+ {
+ { "A", "R0", { 1 } },
+ { "B", "R1", { 10 } },
+ { "C", "R2", { 100 } },
+ },
+ },
+ {
+ // Single table and 2 TS: 100 and 99 replicas at each.
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 100, 99, } },
+ },
+ },
+ {
+ // Table- and cluster-wise balanced configuration with one-off skew.
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 1, 1, } },
+ { "B", "R0", { 1, 2, } },
+ },
+ },
+ {
+ // Table- and cluster-wise balanced configuration with one-off skew.
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 1, 1, } },
+ { "B", "R1", { 1, 2, } },
+ },
+ },
+ {
+ // A configuration which has zero skew cluster-wise, while the table-wise
+ // balance has one-off skew: the algorithm should not try to correct
+ // the latter.
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 1, 2, } },
+ { "B", "R1", { 1, 2, } },
+ { "C", "R2", { 1, 0, } },
+ { "D", "R3", { 1, 0, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 1, 0, 0, } },
+ { "B", "R0", { 0, 1, 0, } },
+ { "C", "R1", { 0, 0, 1, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 1, 0, 0, } },
+ { "B", "R1", { 0, 1, 0, } },
+ { "C", "R2", { 0, 0, 1, } },
+ },
+ },
+ {
+ // A simple balanced case: 3 tablet servers, 3 tables with
+ // one replica per server.
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 1, 1, 1, } },
+ { "B", "R1", { 1, 1, 1, } },
+ { "C", "R2", { 1, 1, 1, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 0, 1, 1, } },
+ { "B", "R1", { 1, 0, 1, } },
+ { "C", "R2", { 1, 1, 0, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 2, 1, 1, } },
+ { "B", "R1", { 1, 2, 1, } },
+ { "C", "R2", { 1, 1, 2, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 1, 1, 0, } },
+ { "B", "R1", { 1, 1, 0, } },
+ { "C", "R2", { 1, 0, 1, } },
+ { "D", "R1", { 1, 0, 1, } },
+ { "E", "R2", { 0, 1, 1, } },
+ { "F", "R0", { 0, 1, 1, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 1, 0, 1, } },
+ { "B", "R1", { 1, 1, 0, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "B", "R0", { 1, 0, 1, } },
+ { "A", "R1", { 1, 1, 0, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 2, 2, 1, } },
+ { "B", "R1", { 1, 0, 1, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R1", { 2, 2, 1, } },
+ { "B", "R0", { 1, 1, 1, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 2, 2, 1, } },
+ { "B", "R1", { 0, 0, 1, } },
+ { "C", "R1", { 0, 0, 1, } },
+ },
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R1", { 0, 1, 0, } },
+ { "B", "R0", { 1, 0, 1, } },
+ { "C", "R2", { 1, 0, 1, } },
},
},
};
@@ -497,61 +691,61 @@ TEST(RebalanceAlgoUnitTest, TableWiseBalanced) {
kNoLocations,
{ "0", "1", },
{
- { "A", { 100, 99, } },
- { "B", { 100, 99, } },
+ { "A", "", { 100, 99, } },
+ { "B", "", { 100, 99, } },
},
- { { "A", "0", "1" }, }
+ { { "A", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", },
{
- { "A", { 1, 2, } },
- { "B", { 1, 2, } },
- { "C", { 1, 0, } },
- { "D", { 0, 1, } },
+ { "A", "", { 1, 2, } },
+ { "B", "", { 1, 2, } },
+ { "C", "", { 1, 0, } },
+ { "D", "", { 0, 1, } },
},
- { { "A", "1", "0" }, }
+ { { "A", "", "1", "0" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 0, 0, } },
- { "B", { 0, 1, 0, } },
- { "C", { 1, 0, 0, } },
+ { "A", "", { 1, 0, 0, } },
+ { "B", "", { 0, 1, 0, } },
+ { "C", "", { 1, 0, 0, } },
},
- { { "A", "0", "2" }, }
+ { { "A", "", "0", "2" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 1, 1, } },
- { "B", { 0, 1, 1, } },
- { "C", { 0, 0, 1, } },
+ { "A", "", { 1, 1, 1, } },
+ { "B", "", { 0, 1, 1, } },
+ { "C", "", { 0, 0, 1, } },
},
- { { "B", "2", "0" }, }
+ { { "B", "", "2", "0" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 1, 0, } },
- { "B", { 1, 0, 1, } },
- { "C", { 1, 0, 1, } },
+ { "A", "", { 1, 1, 0, } },
+ { "B", "", { 1, 0, 1, } },
+ { "C", "", { 1, 0, 1, } },
},
- { { "B", "0", "1" }, }
+ { { "B", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "C", { 1, 0, 1, } },
- { "B", { 1, 0, 1, } },
- { "A", { 1, 1, 0, } },
+ { "C", "", { 1, 0, 1, } },
+ { "B", "", { 1, 0, 1, } },
+ { "A", "", { 1, 1, 0, } },
},
- { { "C", "0", "1" }, }
+ { { "C", "", "0", "1" }, }
},
};
VERIFY_MOVES(kConfigs);
@@ -567,61 +761,61 @@ TEST(RebalanceAlgoUnitTest, OneMoveNoCycling) {
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 0, 1, } },
- { "B", { 1, 0, 1, } },
- { "C", { 1, 1, 0, } },
+ { "A", "", { 1, 0, 1, } },
+ { "B", "", { 1, 0, 1, } },
+ { "C", "", { 1, 1, 0, } },
},
- { { "A", "0", "1" }, }
+ { { "A", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 1, 0, 1, } },
- { "C", { 1, 0, 1, } },
- { "B", { 1, 1, 0, } },
+ { "A", "", { 1, 0, 1, } },
+ { "C", "", { 1, 0, 1, } },
+ { "B", "", { 1, 1, 0, } },
},
- { { "A", "0", "1" }, }
+ { { "A", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "B", { 1, 0, 1, } },
- { "C", { 1, 0, 1, } },
- { "A", { 1, 1, 0, } },
+ { "B", "", { 1, 0, 1, } },
+ { "C", "", { 1, 0, 1, } },
+ { "A", "", { 1, 1, 0, } },
},
- { { "B", "0", "1" }, }
+ { { "B", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "B", { 1, 0, 1, } },
- { "A", { 1, 0, 1, } },
- { "C", { 1, 1, 0, } },
+ { "B", "", { 1, 0, 1, } },
+ { "A", "", { 1, 0, 1, } },
+ { "C", "", { 1, 1, 0, } },
},
- { { "B", "0", "1" }, }
+ { { "B", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "C", { 1, 0, 1, } },
- { "A", { 1, 0, 1, } },
- { "B", { 1, 1, 0, } },
+ { "C", "", { 1, 0, 1, } },
+ { "A", "", { 1, 0, 1, } },
+ { "B", "", { 1, 1, 0, } },
},
- { { "C", "0", "1" }, }
+ { { "C", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "C", { 1, 0, 1, } },
- { "B", { 1, 0, 1, } },
- { "A", { 1, 1, 0, } },
+ { "C", "", { 1, 0, 1, } },
+ { "B", "", { 1, 0, 1, } },
+ { "A", "", { 1, 1, 0, } },
},
- { { "C", "0", "1" }, }
+ { { "C", "", "0", "1" }, }
},
};
VERIFY_MOVES(kConfigs);
@@ -636,49 +830,49 @@ TEST(RebalanceAlgoUnitTest, ClusterWiseBalanced) {
kNoLocations,
{ "0", "1", },
{
- { "A", { 2, 0, } },
- { "B", { 1, 2, } },
+ { "A", "", { 2, 0, } },
+ { "B", "", { 1, 2, } },
},
{
- { "A", "0", "1" },
+ { "A", "", "0", "1" },
}
},
{
kNoLocations,
{ "0", "1", },
{
- { "A", { 1, 2, } },
- { "B", { 2, 0, } },
- { "C", { 1, 2, } },
+ { "A", "", { 1, 2, } },
+ { "B", "", { 2, 0, } },
+ { "C", "", { 1, 2, } },
},
{
- { "B", "0", "1" },
- { "A", "1", "0" },
+ { "B", "", "0", "1" },
+ { "A", "", "1", "0" },
}
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 2, 1, 0, } },
- { "B", { 0, 1, 2, } },
+ { "A", "", { 2, 1, 0, } },
+ { "B", "", { 0, 1, 2, } },
},
{
- { "A", "0", "2" },
- { "B", "2", "0" },
+ { "A", "", "0", "2" },
+ { "B", "", "2", "0" },
}
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 2, 1, 0, } },
- { "B", { 0, 1, 2, } },
- { "C", { 1, 1, 2, } },
+ { "A", "", { 2, 1, 0, } },
+ { "B", "", { 0, 1, 2, } },
+ { "C", "", { 1, 1, 2, } },
},
{
- { "A", "0", "2" },
- { "B", "2", "0" },
+ { "A", "", "0", "2" },
+ { "B", "", "2", "0" },
}
},
};
@@ -693,108 +887,236 @@ TEST(RebalanceAlgoUnitTest, FewMoves) {
kNoLocations,
{ "0", "1", },
{
- { "A", { 2, 0, } },
+ { "A", "", { 2, 0, } },
},
- { { "A", "0", "1" }, }
+ { { "A", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", },
{
- { "A", { 3, 0, } },
+ { "A", "", { 3, 0, } },
},
- { { "A", "0", "1" }, }
+ { { "A", "", "0", "1" }, }
},
{
kNoLocations,
{ "0", "1", },
{
- { "A", { 4, 0, } },
+ { "A", "", { 4, 0, } },
},
{
- { "A", "0", "1" },
- { "A", "0", "1" },
+ { "A", "", "0", "1" },
+ { "A", "", "0", "1" },
}
},
{
kNoLocations,
{ "0", "1", },
{
- { "A", { 1, 2, } },
- { "B", { 2, 0, } },
- { "C", { 2, 1, } },
+ { "A", "", { 1, 2, } },
+ { "B", "", { 2, 0, } },
+ { "C", "", { 2, 1, } },
},
{
- { "B", "0", "1" },
+ { "B", "", "0", "1" },
}
},
{
kNoLocations,
{ "0", "1", },
{
- { "A", { 4, 0, } },
- { "B", { 1, 3, } },
+ { "A", "", { 4, 0, } },
+ { "B", "", { 1, 3, } },
},
{
- { "A", "0", "1" },
- { "B", "1", "0" },
- { "A", "0", "1" },
+ { "A", "", "0", "1" },
+ { "B", "", "1", "0" },
+ { "A", "", "0", "1" },
}
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 4, 2, 0, } },
- { "B", { 2, 1, 0, } },
- { "C", { 1, 1, 1, } },
+ { "A", "", { 4, 2, 0, } },
+ { "B", "", { 2, 1, 0, } },
+ { "C", "", { 1, 1, 1, } },
},
{
- { "A", "0", "2" },
- { "B", "0", "2" },
- { "A", "0", "2" },
+ { "A", "", "0", "2" },
+ { "B", "", "0", "2" },
+ { "A", "", "0", "2" },
}
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 2, 1, 0, } },
- { "B", { 3, 2, 1, } },
- { "C", { 2, 3, 5, } },
+ { "A", "", { 2, 1, 0, } },
+ { "B", "", { 3, 2, 1, } },
+ { "C", "", { 2, 3, 5, } },
},
{
- { "C", "2", "0" },
- { "A", "0", "2" },
- { "B", "0", "2" },
+ { "C", "", "2", "0" },
+ { "A", "", "0", "2" },
+ { "B", "", "0", "2" },
}
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 5, 1, 0, } },
+ { "A", "", { 5, 1, 0, } },
},
{
- { "A", "0", "2" },
- { "A", "0", "1" },
- { "A", "0", "2" },
+ { "A", "", "0", "2" },
+ { "A", "", "0", "1" },
+ { "A", "", "0", "2" },
}
},
{
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 5, 1, 0, } },
- { "B", { 0, 1, 5, } },
+ { "A", "", { 5, 1, 0, } },
+ { "B", "", { 0, 1, 5, } },
},
{
- { "A", "0", "2" },
- { "B", "2", "0" },
- { "A", "0", "1" },
- { "B", "2", "1" },
- { "A", "0", "2" },
- { "B", "2", "0" },
+ { "A", "", "0", "2" },
+ { "B", "", "2", "0" },
+ { "A", "", "0", "1" },
+ { "B", "", "2", "1" },
+ { "A", "", "0", "2" },
+ { "B", "", "2", "0" },
+ }
+ },
+ };
+ VERIFY_MOVES(kConfigs);
+}
+
+TEST(RebalanceAlgoUnitTest, FewMovesSameTableRanges) {
+ const TestClusterConfig kConfigs[] = {
+ {
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 2, 0, } },
+ },
+ { { "A", "R0", "0", "1" }, }
+ },
+ {
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 3, 0, } },
+ },
+ { { "A", "R0", "0", "1" }, }
+ },
+ {
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 4, 0, } },
+ { "A", "", { 1, 3, } },
+ },
+ {
+ { "A", "R0", "0", "1" },
+ { "A", "", "1", "0" },
+ { "A", "R0", "0", "1" },
+ }
+ },
+ {
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 1, 2, } },
+ { "A", "R1", { 2, 0, } },
+ { "A", "R2", { 2, 1, } },
+ { "A", "", { 2, 2, } },
+ },
+ {
+ { "A", "R1", "0", "1" },
+ }
+ },
+ {
+ kNoLocations,
+ { "0", "1", },
+ {
+ { "A", "R0", { 4, 0, } },
+ { "A", "R1", { 1, 3, } },
+ { "A", "", { 0, 2, } },
+ },
+ {
+ { "A", "R0", "0", "1" },
+ { "A", "R1", "1", "0" },
+ { "A", "", "1", "0" },
+ { "A", "R0", "0", "1" },
+ }
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 4, 2, 0, } },
+ { "A", "R1", { 2, 1, 0, } },
+ { "A", "R2", { 1, 1, 1, } },
+ { "A", "", { 0, 2, 1, } },
+ },
+ {
+ { "A", "R0", "0", "2" },
+ { "A", "R1", "0", "2" },
+ { "A", "", "1", "0" },
+ { "A", "R0", "0", "2" },
+ }
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 2, 1, 0, } },
+ { "A", "R1", { 3, 2, 1, } },
+ { "A", "R2", { 2, 3, 5, } },
+ { "A", "", { 6, 0, 0, } },
+ },
+ {
+ { "A", "", "0", "1" },
+ { "A", "", "0", "2" },
+ { "A", "R2", "2", "0" },
+ { "A", "", "0", "2" },
+ { "A", "R0", "0", "2" },
+ { "A", "R1", "0", "2" },
+ { "A", "", "0", "1" },
+ }
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 5, 1, 0, } },
+ { "A", "", { 2, 1, 0, } },
+ },
+ {
+ { "A", "R0", "0", "2" },
+ { "A", "R0", "0", "2" },
+ { "A", "", "0", "2" },
+ { "A", "R0", "0", "1" },
+ }
+ },
+ {
+ kNoLocations,
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 5, 1, 0, } },
+ { "A", "R1", { 0, 1, 5, } },
+ },
+ {
+ { "A", "R0", "0", "2" },
+ { "A", "R1", "2", "0" },
+ { "A", "R0", "0", "1" },
+ { "A", "R1", "2", "1" },
+ { "A", "R0", "0", "2" },
+ { "A", "R1", "2", "0" },
}
},
};
@@ -808,7 +1130,7 @@ TEST(RebalanceAlgoUnitTest, ManyMoves) {
kNoLocations,
{ "0", "1", "2", },
{
- { "A", { 100, 400, 100, } },
+ { "A", "", { 100, 400, 100, } },
},
};
constexpr size_t kExpectedMovesNum = 200;
@@ -819,9 +1141,9 @@ TEST(RebalanceAlgoUnitTest, ManyMoves) {
vector<TableReplicaMove> ref_moves;
for (size_t i = 0; i < kExpectedMovesNum; ++i) {
if (i % 2) {
- ref_moves.push_back({ "A", "1", "2" });
+ ref_moves.push_back({ "A", "", "1", "2" });
} else {
- ref_moves.push_back({ "A", "1", "0" });
+ ref_moves.push_back({ "A", "", "1", "0" });
}
}
@@ -858,6 +1180,7 @@ TEST(RebalanceAlgoUnitTest, RandomizedTest) {
}
table_replicas.push_back(TablePerServerReplicas{
Substitute("$0", i),
+ std::to_string(r.Next()), // a randomized tag
std::move(num_replicas_per_server),
});
}
@@ -900,7 +1223,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 1, 0, 0, } }, },
+ { { "A", "", { 1, 0, 0, } }, },
{}
},
{
@@ -909,7 +1232,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 0, 0, 1, } }, },
+ { { "A", "", { 0, 0, 1, } }, },
{}
},
{
@@ -918,7 +1241,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 1, 1, 0, } }, },
+ { { "A", "", { 1, 1, 0, } }, },
{}
},
{
@@ -927,7 +1250,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 1, 1, 1, } }, },
+ { { "A", "", { 1, 1, 1, } }, },
{}
},
{
@@ -936,8 +1259,8 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 2, 1, 0, } }, },
- { { "A", "0", "2" }, }
+ { { "A", "", { 2, 1, 0, } }, },
+ { { "A", "", "0", "2" }, }
},
{
{
@@ -945,7 +1268,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 1, 1, 2, } }, },
+ { { "A", "", { 1, 1, 2, } }, },
{}
},
{
@@ -954,8 +1277,8 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 2, 1, 3, } }, },
- { { "A", "2", "1" }, }
+ { { "A", "", { 2, 1, 3, } }, },
+ { { "A", "", "2", "1" }, }
},
{
{
@@ -963,10 +1286,10 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 2, 4, 0, } }, },
+ { { "A", "", { 2, 4, 0, } }, },
{
- { "A", "1", "2" },
- { "A", "1", "2" },
+ { "A", "", "1", "2" },
+ { "A", "", "1", "2" },
}
},
{
@@ -975,7 +1298,126 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", "3", "4", "5", }, },
},
{ "0", "1", "2", "3", "4", "5" },
- { { "A", { 1, 1, 1, 1, 1, 1, } }, },
+ { { "A", "", { 1, 1, 1, 1, 1, 1, } }, },
+ {}
+ },
+ {
+ {
+ { "L0", { "0", "1", }, },
+ { "L1", { "2", "3", "4", "5", }, },
+ },
+ { "0", "1", "2", "3", "4", "5" },
+ { { "A", "", { 2, 0, 4, 0, 0, 0, } }, },
+ {}
+ },
+ {
+ {
+ { "L0", { "0", }, },
+ { "L1", { "1", "2", "3", "4", "5", }, },
+ },
+ { "0", "1", "2", "3", "4", "5", },
+ { { "A", "", { 0, 1, 1, 1, 1, 1, } }, },
+ {}
+ },
+ {
+ {
+ { "L0", { "0", }, },
+ { "L1", { "1", "2", "3", "4", "5", }, },
+ },
+ { "0", "1", "2", "3", "4", "5", },
+ { { "A", "", { 0, 5, 0, 0, 0, 0, } }, },
+ {}
+ },
+ {
+ {
+ { "L0", { "0", }, },
+ { "L1", { "1", "2", "3", "4", "5", }, },
+ },
+ { "0", "1", "2", "3", "4", "5", },
+ { { "A", "", { 2, 1, 1, 1, 1, 0, } }, },
+ { { "A", "", "0", "5" }, }
+ },
+ };
+ VERIFY_LOCATION_BALANCING_MOVES(kConfigs);
+}
+
+TEST(RebalanceAlgoUnitTest, LocationBalancingRangesFewMoves) {
+ const TestClusterConfig kConfigs[] = {
+ {
+ {
+ { "L0", { "0", }, },
+ { "L1", { "1", }, },
+ { "L2", { "2", }, },
+ },
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 2, 1, 0, } },
+ { "A", "R1", { 2, 1, 0, } },
+ { "A", "R2", { 1, 1, 4, } },
+ },
+ {
+ { "A", "R0", "0", "2" },
+ { "A", "R1", "0", "2" },
+ { "A", "R2", "2", "0" },
+ { "A", "R2", "2", "1" },
+ },
+ { MovesOrderingComparison::IGNORE }
+ },
+ {
+ {
+ { "L0", { "0", "1", }, },
+ { "L1", { "2", }, },
+ },
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 1, 2, 0 } },
+ { "A", "R1", { 2, 0, 1 } },
+ { "A", "R2", { 2, 1, 0 } },
+ },
+ {
+ { "A", "R0", "0", "2" },
+ { "A", "R2", "0", "2" },
+ },
+ { MovesOrderingComparison::IGNORE }
+ },
+ {
+ {
+ { "L0", { "0", "1", }, },
+ { "L1", { "2", }, },
+ },
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 1, 0, 0, } },
+ { "A", "R1", { 3, 0, 0, } },
+ { "A", "R2", { 0, 0, 2, } },
+ },
+ {
+ { "A", "R2", "2", "1" },
+ { "A", "R1", "0", "2" },
+ }
+ },
+ {
+ {
+ { "L0", { "0", "1", }, },
+ { "L1", { "2", }, },
+ },
+ { "0", "1", "2", },
+ {
+ { "A", "R0", { 0, 0, 5, } },
+ },
+ {
+ { "A", "R0", "2", "0" },
+ { "A", "R0", "2", "1" },
+ { "A", "R0", "2", "0" },
+ }
+ },
+ {
+ {
+ { "L0", { "0", "1", }, },
+ { "L1", { "2", }, },
+ },
+ { "0", "1", "2", },
+ { { "A", "R0", { 1, 1, 0, } }, },
{}
},
{
@@ -984,7 +1426,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "2", "3", "4", "5", }, },
},
{ "0", "1", "2", "3", "4", "5" },
- { { "A", { 2, 0, 4, 0, 0, 0, } }, },
+ { { "A", "", { 2, 0, 4, 0, 0, 0, } }, },
{}
},
{
@@ -993,7 +1435,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "1", "2", "3", "4", "5", }, },
},
{ "0", "1", "2", "3", "4", "5", },
- { { "A", { 0, 1, 1, 1, 1, 1, } }, },
+ { { "A", "R0", { 0, 1, 1, 1, 1, 1, } }, },
{}
},
{
@@ -1002,7 +1444,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "1", "2", "3", "4", "5", }, },
},
{ "0", "1", "2", "3", "4", "5", },
- { { "A", { 0, 5, 0, 0, 0, 0, } }, },
+ { { "A", "R0", { 0, 5, 0, 0, 0, 0, } }, },
{}
},
{
@@ -1011,8 +1453,8 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingFewMoves) {
{ "L1", { "1", "2", "3", "4", "5", }, },
},
{ "0", "1", "2", "3", "4", "5", },
- { { "A", { 2, 1, 1, 1, 1, 0, } }, },
- { { "A", "0", "5" }, }
+ { { "A", "R0", { 2, 1, 1, 1, 1, 0, } }, },
+ { { "A", "R0", "0", "5" }, }
},
};
VERIFY_LOCATION_BALANCING_MOVES(kConfigs);
@@ -1028,8 +1470,8 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleST) {
{ "L2", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 2, 1, 0, } }, },
- { { "A", "0", "2" }, }
+ { { "A", "", { 2, 1, 0, } }, },
+ { { "A", "", "0", "2" }, }
},
{
{
@@ -1038,12 +1480,12 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleST) {
{ "L2", { "2", }, },
},
{ "0", "1", "2", },
- { { "A", { 6, 0, 0, } }, },
+ { { "A", "", { 6, 0, 0, } }, },
{
- { "A", "0", "1" },
- { "A", "0", "2" },
- { "A", "0", "1" },
- { "A", "0", "2" },
+ { "A", "", "0", "1" },
+ { "A", "", "0", "2" },
+ { "A", "", "0", "1" },
+ { "A", "", "0", "2" },
},
{ MovesOrderingComparison::IGNORE }
},
@@ -1055,7 +1497,7 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleST) {
},
{ "0", "1", "2", },
{
- { "A", { 1, 0, 0, } },
+ { "A", "", { 1, 0, 0, } },
},
{}
},
@@ -1074,10 +1516,10 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleMT) {
},
{ "0", "1", "2", },
{
- { "A", { 2, 1, 1, } },
- { "B", { 0, 0, 2, } },
+ { "A", "", { 2, 1, 1, } },
+ { "B", "", { 0, 0, 2, } },
},
- { { "B", "2", "1" }, }
+ { { "B", "", "2", "1" }, }
},
{
{
@@ -1087,13 +1529,13 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleMT) {
},
{ "0", "1", "2", },
{
- { "A", { 2, 1, 0, } },
- { "B", { 0, 0, 3, } },
+ { "A", "", { 2, 1, 0, } },
+ { "B", "", { 0, 0, 3, } },
},
{
- { "B", "2", "1" },
- { "B", "2", "0" },
- { "A", "0", "2" },
+ { "B", "", "2", "1" },
+ { "B", "", "2", "0" },
+ { "A", "", "0", "2" },
}
},
{
@@ -1104,9 +1546,9 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleMT) {
},
{ "0", "1", "2", },
{
- { "A", { 1, 0, 0, } },
- { "B", { 1, 1, 2, } },
- { "C", { 10, 9, 10, } },
+ { "A", "", { 1, 0, 0, } },
+ { "B", "", { 1, 1, 2, } },
+ { "C", "", { 10, 9, 10, } },
},
{}
},
diff --git a/src/kudu/rebalance/rebalance_algo.cc b/src/kudu/rebalance/rebalance_algo.cc
index a1fda56f0..4d74e135d 100644
--- a/src/kudu/rebalance/rebalance_algo.cc
+++ b/src/kudu/rebalance/rebalance_algo.cc
@@ -29,6 +29,7 @@
#include <utility>
#include <vector>
+#include <boost/container_hash/extensions.hpp>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
@@ -104,6 +105,24 @@ Status MoveOneReplica(const string& src,
}
} // anonymous namespace
+size_t TableIdAndTagHash::operator()(const TableIdAndTag& idt) const noexcept {
+ size_t seed = 0;
+ boost::hash_combine(seed, idt.table_id);
+ boost::hash_combine(seed, idt.tag);
+ return seed;
+}
+
+bool TableIdAndTagEqual::operator()(const TableIdAndTag& lhs,
+ const TableIdAndTag& rhs) const {
+ return lhs.table_id == rhs.table_id && lhs.tag == rhs.tag;
+}
+
+std::ostream& operator<<(std::ostream& out, const TableIdAndTag& table_info) {
+ out << "table_id: " << table_info.table_id
+ << " table tag: " << table_info.tag;
+ return out;
+}
+
Status RebalancingAlgo::GetNextMoves(const ClusterInfo& cluster_info,
int max_moves_num,
vector<TableReplicaMove>* moves) {
@@ -163,7 +182,7 @@ Status RebalancingAlgo::ApplyMove(const TableReplicaMove& move,
bool found_table_info = false;
for (auto it = table_info_by_skew.begin(); it != table_info_by_skew.end(); ) {
TableBalanceInfo& info = it->second;
- if (info.table_id != move.table_id) {
+ if (info.table_id != move.table_id || info.tag != move.tag) {
++it;
continue;
}
@@ -245,22 +264,23 @@ Status TwoDimensionalGreedyAlgo::GetNextMove(
const auto& servers_by_table_replica_count = tbi.servers_by_replica_count;
if (servers_by_table_replica_count.empty()) {
return Status::InvalidArgument(Substitute(
- "no information on replicas of table $0", tbi.table_id));
+ "no information on replicas of table $0 tag '$1'",
+ tbi.table_id, tbi.tag));
}
const auto min_replica_count = servers_by_table_replica_count.begin()->first;
const auto max_replica_count = servers_by_table_replica_count.rbegin()->first;
VLOG(1) << Substitute(
- "balancing table $0 with replica count skew $1 "
- "(min_replica_count: $2, max_replica_count: $3)",
- tbi.table_id, table_info_by_skew.rbegin()->first,
+ "balancing table $0 (tag '$1') with replica count skew $2 "
+ "(min_replica_count: $3, max_replica_count: $4)",
+ tbi.table_id, tbi.tag, table_info_by_skew.rbegin()->first,
min_replica_count, max_replica_count);
// Compute the intersection of the tablet servers most loaded for the table
// with the tablet servers most loaded overall, and likewise for least loaded.
// These are our ideal candidates for moving from and to, respectively.
- int32_t max_count_table;
- int32_t max_count_total;
+ int32_t max_count_table = 0;
+ int32_t max_count_total = 0;
vector<string> max_loaded;
vector<string> max_loaded_intersection;
RETURN_NOT_OK(GetIntersection(
@@ -268,8 +288,8 @@ Status TwoDimensionalGreedyAlgo::GetNextMove(
servers_by_table_replica_count, servers_by_total_replica_count,
&max_count_table, &max_count_total,
&max_loaded, &max_loaded_intersection));
- int32_t min_count_table;
- int32_t min_count_total;
+ int32_t min_count_table = 0;
+ int32_t min_count_total = 0;
vector<string> min_loaded;
vector<string> min_loaded_intersection;
RETURN_NOT_OK(GetIntersection(
@@ -328,7 +348,8 @@ Status TwoDimensionalGreedyAlgo::GetNextMove(
// Move a replica of the selected table from a most loaded server to a
// least loaded server.
- *move = TableReplicaMove{ tbi.table_id, max_loaded_uuid, min_loaded_uuid };
+ *move = TableReplicaMove{ tbi.table_id, tbi.tag,
+ max_loaded_uuid, min_loaded_uuid };
break;
}
@@ -413,12 +434,12 @@ Status LocationBalancingAlgo::GetNextMove(
*move = boost::none;
// Per-table information on locations load.
- // TODO(aserbin): maybe, move this container into ClusterInfo?
- unordered_map<string, multimap<double, string>> location_load_info_by_table;
+ unordered_map<TableIdAndTag, multimap<double, string>,
+ TableIdAndTagHash, TableIdAndTagEqual> location_load_info_by_table;
// A dictionary to map location-wise load imbalance into table identifier.
// The most imbalanced tables come last.
- multimap<double, string> table_id_by_load_imbalance;
+ multimap<double, TableIdAndTag> table_id_by_load_imbalance;
for (const auto& elem : cluster_info.balance.table_info_by_skew) {
const auto& table_info = elem.second;
// Number of replicas of all tablets comprising the table, per location.
@@ -441,24 +462,26 @@ Status LocationBalancingAlgo::GetNextMove(
}
const auto& table_id = table_info.table_id;
+ const auto& table_tag = table_info.tag;
const auto load_min = location_by_load.cbegin()->first;
const auto load_max = location_by_load.crbegin()->first;
const auto imbalance = load_max - load_min;
DCHECK(!std::isnan(imbalance));
- table_id_by_load_imbalance.emplace(imbalance, table_id);
+ table_id_by_load_imbalance.emplace(imbalance,
+ TableIdAndTag{table_id, table_tag});
EmplaceOrDie(&location_load_info_by_table,
- table_id, std::move(location_by_load));
- }
+ TableIdAndTag{ table_id, table_tag },
+ std::move(location_by_load)); }
- string imbalanced_table_id;
- if (!IsBalancingNeeded(table_id_by_load_imbalance, &imbalanced_table_id)) {
+ TableIdAndTag imbalanced_table_info;
+ if (!IsBalancingNeeded(table_id_by_load_imbalance, &imbalanced_table_info)) {
// Nothing to do: all tables are location-balanced enough.
return Status::OK();
}
// Work on the most location-wise unbalanced tables first.
const auto& load_info = FindOrDie(
- location_load_info_by_table, imbalanced_table_id);
+ location_load_info_by_table, imbalanced_table_info);
vector<string> loc_loaded_least;
{
@@ -496,18 +519,19 @@ Status LocationBalancingAlgo::GetNextMove(
VLOG(1) << "loc_leaded_most: " << s.str();
}
- return FindBestMove(imbalanced_table_id, loc_loaded_least, loc_loaded_most,
+ return FindBestMove(imbalanced_table_info, loc_loaded_least, loc_loaded_most,
cluster_info, move);
}
bool LocationBalancingAlgo::IsBalancingNeeded(
const TableByLoadImbalance& imbalance_info,
- string* most_imbalanced_table_id) const {
+ TableIdAndTag* most_imbalanced_table_info) const {
if (PREDICT_FALSE(VLOG_IS_ON(1))) {
ostringstream ss;
ss << "Table imbalance report: " << endl;
for (const auto& elem : imbalance_info) {
- ss << " " << elem.second << ": " << elem.first << endl;
+ ss << " " << elem.second.table_id << ":" << elem.second.tag
+ << ": " << elem.first << endl;
}
VLOG(1) << ss.str();
}
@@ -530,7 +554,7 @@ bool LocationBalancingAlgo::IsBalancingNeeded(
const auto it = imbalance_info.crbegin();
const auto imbalance = it->first;
if (imbalance > load_imbalance_threshold_) {
- *most_imbalanced_table_id = it->second;
+ *most_imbalanced_table_info = it->second;
return true;
}
return false;
@@ -540,7 +564,7 @@ bool LocationBalancingAlgo::IsBalancingNeeded(
// the source and destination tablet server to move a replica of the specified
// tablet to improve per-table location load balance as much as possible.
Status LocationBalancingAlgo::FindBestMove(
- const string& table_id,
+ const TableIdAndTag& table_info,
const vector<string>& loc_loaded_least,
const vector<string>& loc_loaded_most,
const ClusterInfo& cluster_info,
@@ -612,7 +636,7 @@ Status LocationBalancingAlgo::FindBestMove(
const auto& src_ts_id = it_max->second;
CHECK_NE(src_ts_id, dst_ts_id);
- *move = TableReplicaMove{ table_id, src_ts_id, dst_ts_id };
+ *move = TableReplicaMove{ table_info.table_id, table_info.tag, src_ts_id, dst_ts_id };
return Status::OK();
}
diff --git a/src/kudu/rebalance/rebalance_algo.h b/src/kudu/rebalance/rebalance_algo.h
index 67f12670f..660bbedd1 100644
--- a/src/kudu/rebalance/rebalance_algo.h
+++ b/src/kudu/rebalance/rebalance_algo.h
@@ -16,7 +16,9 @@
// under the License.
#pragma once
+#include <cstddef>
#include <cstdint>
+#include <iosfwd>
#include <map>
#include <random>
#include <set>
@@ -40,10 +42,36 @@ namespace rebalance {
// replica counts.
typedef std::multimap<int32_t, std::string> ServersByCountMap;
+// A structure to contain information on a table with attribution to a grouping
+// criterion encoded in the 'tag' field. This is used for table range
+// rebalancing: in such case, the 'tag' field contains information on the key
+// for a particular a range of the table.
+struct TableIdAndTag {
+ std::string table_id;
+ std::string tag;
+};
+
+struct TableIdAndTagHash {
+ size_t operator()(const TableIdAndTag& idt) const noexcept;
+};
+
+struct TableIdAndTagEqual {
+ bool operator()(const TableIdAndTag& lhs, const TableIdAndTag& rhs) const;
+};
+
+std::ostream& operator<<(std::ostream& out, const TableIdAndTag& table_info);
+
// Balance information for a table.
struct TableBalanceInfo {
+ // The identifier of a table (table UUID).
std::string table_id;
+ // A tag for grouping table-specific information (e.g., the key for the
+ // beginning of a table range). The 'servers_by_replica_count' map has data
+ // aggregated for the {table_id, tag} pair correspondingly. This is used for
+ // range rebalancing.
+ std::string tag;
+
// Mapping table replica count -> tablet server.
//
// The table replica count of a tablet server is defined as the number of
@@ -92,9 +120,18 @@ struct ClusterInfo {
// A directive to move some replica of a table between two tablet servers.
struct TableReplicaMove {
+ // The identifier of the table which replicas to move.
std::string table_id;
- std::string from; // Unique identifier of the source tablet server.
- std::string to; // Unique identifier of the target tablet server.
+
+ // Tag/hint to find matching replicas for this move. For example, if
+ // rebalancing range partitions, that's the key of the range partition start.
+ std::string tag;
+
+ // Unique identifier of the source tablet server.
+ std::string from;
+
+ // Unique identifier of the target tablet server.
+ std::string to;
};
// A rebalancing algorithm, which orders replica moves aiming to balance a
@@ -266,7 +303,7 @@ class LocationBalancingAlgo : public RebalancingAlgo {
boost::optional<TableReplicaMove>* move) override;
private:
FRIEND_TEST(RebalanceAlgoUnitTest, RandomizedTest);
- typedef std::multimap<double, std::string> TableByLoadImbalance;
+ typedef std::multimap<double, TableIdAndTag> TableByLoadImbalance;
// Check if any rebalancing is needed across cluster locations based on the
// information provided by the 'imbalance_info' parameter. Returns 'true'
@@ -274,14 +311,15 @@ class LocationBalancingAlgo : public RebalancingAlgo {
// the identifier of the most cross-location imbalanced table is output into
// the 'most_imbalanced_table_id' parameter (which must not be null).
bool IsBalancingNeeded(const TableByLoadImbalance& imbalance_info,
- std::string* most_imbalanced_table_id) const;
+ TableIdAndTag* most_imbalanced_table_info) const;
// Given the set of the most and the least table-wise loaded locations, choose
// the source and destination tablet server to move a replica of the specified
// tablet to improve per-table location load balance as much as possible.
// If no replica can be moved to balance the load, the 'move' output parameter
// is set to 'boost::none'.
- Status FindBestMove(const std::string& table_id,
+ static Status FindBestMove(
+ const TableIdAndTag& table_info,
const std::vector<std::string>& loc_loaded_least,
const std::vector<std::string>& loc_loaded_most,
const ClusterInfo& cluster_info,
diff --git a/src/kudu/rebalance/rebalancer.cc b/src/kudu/rebalance/rebalancer.cc
index f93dc99b7..918c7af17 100644
--- a/src/kudu/rebalance/rebalancer.cc
+++ b/src/kudu/rebalance/rebalancer.cc
@@ -74,7 +74,8 @@ Rebalancer::Config::Config(vector<string> ignored_tservers_param,
bool run_intra_location_rebalancing,
double load_imbalance_threshold,
bool force_rebalance_replicas_on_maintenance_tservers,
- size_t intra_location_rebalancing_concurrency)
+ size_t intra_location_rebalancing_concurrency,
+ bool enable_range_rebalancing)
: ignored_tservers(ignored_tservers_param.begin(), ignored_tservers_param.end()),
master_addresses(std::move(master_addresses)),
table_filters(std::move(table_filters)),
@@ -91,7 +92,8 @@ Rebalancer::Config::Config(vector<string> ignored_tservers_param,
force_rebalance_replicas_on_maintenance_tservers(
force_rebalance_replicas_on_maintenance_tservers),
intra_location_rebalancing_concurrency(
- intra_location_rebalancing_concurrency) {
+ intra_location_rebalancing_concurrency),
+ enable_range_rebalancing(enable_range_rebalancing) {
DCHECK_GE(max_moves_per_server, 0);
}
@@ -114,7 +116,9 @@ Rebalancer::Rebalancer(Config config)
void Rebalancer::FindReplicas(const TableReplicaMove& move,
const ClusterRawInfo& raw_info,
vector<string>* tablet_ids) {
+ const bool is_range_rebalancing = config_.enable_range_rebalancing;
const auto& table_id = move.table_id;
+ const auto& tag = move.tag;
// Tablet ids of replicas on the source tserver that are non-leaders.
vector<string> tablet_uuids_src;
@@ -127,6 +131,9 @@ void Rebalancer::FindReplicas(const TableReplicaMove& move,
if (tablet_summary.table_id != table_id) {
continue;
}
+ if (is_range_rebalancing && tablet_summary.range_key_begin != tag) {
+ continue;
+ }
if (tablet_summary.result != HealthCheckResult::HEALTHY) {
VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 "
"as candidates for movement since the tablet's "
@@ -283,13 +290,20 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
DCHECK(info);
// tserver UUID --> total replica count of all table's tablets at the server
+ // (tagged context applies here)
typedef unordered_map<string, int32_t> TableReplicasAtServer;
// The result information to build.
ClusterInfo result_info;
+ // tserver UUID --> total count of replicas at the server
unordered_map<string, int32_t> tserver_replicas_count;
- unordered_map<string, TableReplicasAtServer> table_replicas_info;
+
+ // table_id.range_key --> count of tablet replicas of the table at tservers
+ unordered_map<TableIdAndTag, TableReplicasAtServer,
+ TableIdAndTagHash, TableIdAndTagEqual> table_replicas_info;
+
+ // UUIDs of unhealthy tablet servers.
unordered_set<string> unhealthy_tablet_servers;
// Build a set of tables with RF=1 (single replica tables).
@@ -368,7 +382,10 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
it != tserver_replicas_count.end()) {
it->second++;
auto table_ins = table_replicas_info.emplace(
- tablet.table_id, TableReplicasAtServer());
+ TableIdAndTag{ tablet.table_id,
+ config_.enable_range_rebalancing
+ ? tablet.range_key_begin : "" },
+ TableReplicasAtServer());
TableReplicasAtServer& replicas_at_server = table_ins.first->second;
auto replicas_ins = replicas_at_server.emplace(move_info.ts_uuid_to, 0);
@@ -404,7 +421,8 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
}
auto table_ins = table_replicas_info.emplace(
- tablet.table_id, TableReplicasAtServer());
+ TableIdAndTag{tablet.table_id, tablet.range_key_begin},
+ TableReplicasAtServer());
TableReplicasAtServer& replicas_at_server = table_ins.first->second;
auto replicas_ins = replicas_at_server.emplace(ri.ts_uuid, 0);
@@ -439,11 +457,13 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
// Populate ClusterBalanceInfo::table_info_by_skew
auto& table_info_by_skew = result_info.balance.table_info_by_skew;
for (const auto& elem : table_replicas_info) {
- const auto& table_id = elem.first;
+ const auto& table_id = elem.first.table_id;
+ const auto& tag = elem.first.tag;
int32_t max_count = numeric_limits<int32_t>::min();
int32_t min_count = numeric_limits<int32_t>::max();
TableBalanceInfo tbi;
tbi.table_id = table_id;
+ tbi.tag = tag;
for (const auto& e : elem.second) {
const auto& ts_uuid = e.first;
const auto replica_count = e.second;
diff --git a/src/kudu/rebalance/rebalancer.h b/src/kudu/rebalance/rebalancer.h
index 079fbcc8a..a8a862efa 100644
--- a/src/kudu/rebalance/rebalancer.h
+++ b/src/kudu/rebalance/rebalancer.h
@@ -69,7 +69,8 @@ class Rebalancer {
bool run_intra_location_rebalancing = true,
double load_imbalance_threshold = kLoadImbalanceThreshold,
bool force_rebalance_replicas_on_maintenance_tservers = false,
- size_t intra_location_rebalancing_concurrency = 0);
+ size_t intra_location_rebalancing_concurrency = 0,
+ bool enable_range_rebalancing = false);
// UUIDs of ignored servers. If empty, run the rebalancing on
// all tablet servers in the cluster only when all tablet servers
@@ -141,6 +142,9 @@ class Rebalancer {
// The maximum number of intra-location rebalancing sessions that can be run
// in parallel. Value of 0 means 'the number of CPU cores at the node'.
size_t intra_location_rebalancing_concurrency;
+
+ // Whether to rebalance ranges of a table.
+ bool enable_range_rebalancing;
};
// Represents a concrete move of a replica from one tablet server to another.
@@ -180,9 +184,9 @@ class Rebalancer {
// of the 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from
// and TableReplica::to correspondingly. If no suitable tablet replicas are found,
// 'tablet_ids' will be empty.
- static void FindReplicas(const TableReplicaMove& move,
- const ClusterRawInfo& raw_info,
- std::vector<std::string>* tablet_ids);
+ void FindReplicas(const TableReplicaMove& move,
+ const ClusterRawInfo& raw_info,
+ std::vector<std::string>* tablet_ids);
// Convert the 'raw' information about the cluster into information suitable
// for the input of the high-level rebalancing algorithm.
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index e9af759df..f72a2d7ea 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -41,6 +41,7 @@
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/common/partial_row.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/quorum_util.h"
@@ -511,18 +512,46 @@ static Status CreateTables(
const string& table_name_pattern,
int num_tables,
int rep_factor,
+ int num_table_hash_partitions = 3,
+ int num_table_range_partitions = 0,
vector<string>* table_names = nullptr) {
+ DCHECK_GE(num_table_range_partitions, 0);
+ if (num_table_range_partitions != 0 && num_table_range_partitions < 2) {
+ // That's an artificial restriction to simplify the logic for creating
+ // range partitions which cover the whole key space.
+ return Status::InvalidArgument(
+ "number of range partitions should be at least 2");
+ }
// Create tables with their tablet replicas landing only on the tablet servers
// which are up and running.
auto client_schema = KuduSchema::FromSchema(table_schema);
for (auto i = 0; i < num_tables; ++i) {
string table_name = Substitute(table_name_pattern, i);
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
- RETURN_NOT_OK(table_creator->table_name(table_name)
- .schema(&client_schema)
- .add_hash_partitions({ "key" }, 3)
- .num_replicas(rep_factor)
- .Create());
+ table_creator->table_name(table_name);
+ table_creator->schema(&client_schema);
+ table_creator->num_replicas(rep_factor);
+ if (num_table_hash_partitions > 1) {
+ table_creator->add_hash_partitions({ "key" }, num_table_hash_partitions);
+ }
+ for (auto i = 0; i < num_table_range_partitions; ++i) {
+ unique_ptr<KuduPartialRow> lower_bound(client_schema.NewRow());
+ unique_ptr<KuduPartialRow> upper_bound(client_schema.NewRow());
+ if (i == 0) {
+ RETURN_NOT_OK(lower_bound->SetInt32("key", INT32_MIN));
+ RETURN_NOT_OK(upper_bound->SetInt32("key", 0));
+ } else if (i + 1 == num_table_range_partitions) {
+ RETURN_NOT_OK(lower_bound->SetInt32("key", (i - 1) * 1000));
+ RETURN_NOT_OK(upper_bound->SetInt32("key", INT32_MAX));
+ } else {
+ RETURN_NOT_OK(lower_bound->SetInt32("key", (i - 1) * 1000));
+ RETURN_NOT_OK(upper_bound->SetInt32("key", i * 1000));
+ }
+ table_creator->add_range_partition(lower_bound.release(),
+ upper_bound.release());
+ }
+ RETURN_NOT_OK(table_creator->Create());
+
RETURN_NOT_OK(RunKuduTool({
"perf",
"loadgen",
@@ -551,6 +580,8 @@ static Status CreateUnbalancedTables(
int tserver_idx_from,
int tserver_num,
int tserver_unresponsive_ms,
+ int num_table_hash_partitions,
+ int num_table_range_partitions,
vector<string>* table_names = nullptr) {
// Keep running only some tablet servers and shut down the rest.
for (auto i = tserver_idx_from; i < tserver_num; ++i) {
@@ -560,8 +591,9 @@ static Status CreateUnbalancedTables(
// Wait for the catalog manager to understand that not all tablet servers
// are available.
SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
- RETURN_NOT_OK(CreateTables(cluster, client, table_schema, table_name_pattern,
- num_tables, rep_factor, table_names));
+ RETURN_NOT_OK(CreateTables(
+ cluster, client, table_schema, table_name_pattern, num_tables, rep_factor,
+ num_table_hash_partitions, num_table_range_partitions, table_names));
for (auto i = tserver_idx_from; i < tserver_num; ++i) {
RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
}
@@ -589,6 +621,8 @@ TEST_P(RebalanceParamTest, Rebalance) {
constexpr auto kNumTables = 5;
const string table_name_pattern = "rebalance_test_table_$0";
constexpr auto kTserverUnresponsiveMs = 3000;
+ constexpr auto kNumTableHashPartitions = 3;
+ constexpr auto kNumTableRangePartitions = 0;
const auto timeout = MonoDelta::FromSeconds(30);
const vector<string> kMasterFlags = {
"--allow_unsafe_replication_factor",
@@ -605,7 +639,8 @@ TEST_P(RebalanceParamTest, Rebalance) {
ASSERT_OK(CreateUnbalancedTables(
cluster_.get(), client_.get(), schema_, table_name_pattern, kNumTables,
- kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs));
+ kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs,
+ kNumTableHashPartitions, kNumTableRangePartitions));
// Workloads aren't run for 3-2-3 replica movement with RF = 1 because
// the tablet is unavailable during the move until the target voter replica
@@ -709,6 +744,14 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
virtual bool is_343_scheme() const = 0;
+ virtual int num_hash_partitions_for_test_tables() const {
+ return 3;
+ }
+
+ virtual int num_range_partitions_for_test_tables() const {
+ return 0;
+ }
+
protected:
static const char* const kExitOnSignalStr;
static const char* const kTableNamePattern;
@@ -737,7 +780,10 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
ASSERT_OK(CreateUnbalancedTables(
cluster_.get(), client_.get(), schema_, kTableNamePattern,
num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
- tserver_unresponsive_ms_, created_tables_names));
+ tserver_unresponsive_ms_,
+ num_hash_partitions_for_test_tables(),
+ num_range_partitions_for_test_tables(),
+ created_tables_names));
} else {
ASSERT_OK(CreateTablesExcludingLocations(empty_locations,
kTableNamePattern,
@@ -778,6 +824,8 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms_ / 4));
RETURN_NOT_OK(CreateTables(cluster_.get(), client_.get(), schema_,
table_name_pattern, num_tables_, rep_factor_,
+ num_hash_partitions_for_test_tables(),
+ num_range_partitions_for_test_tables(),
table_names));
// Start tablet servers at the excluded locations.
if (!excluded_locations.empty()) {
@@ -822,6 +870,8 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms_ / 4));
RETURN_NOT_OK(CreateTables(cluster_.get(), client_.get(), schema_,
table_name_pattern, num_tables_, rep_factor_,
+ num_hash_partitions_for_test_tables(),
+ num_range_partitions_for_test_tables(),
table_names));
// Start tablet servers at the excluded locations.
for (const auto& uuid : excluded_tserver_uuids) {
@@ -2226,5 +2276,90 @@ TEST_F(IntraLocationRebalancingBasicTest, LocationsWithEmptyTabletServers) {
}
}
+class TableRangeRebalancingTest : public RebalancingTest {
+ public:
+ TableRangeRebalancingTest()
+ : RebalancingTest(/*num_tables=*/ 1,
+ /*rep_factor*/ 3,
+ /*num_tservers*/ 3) {
+ }
+
+ bool is_343_scheme() const override {
+ return true;
+ }
+
+ int num_hash_partitions_for_test_tables() const override {
+ return 8;
+ }
+
+ int num_range_partitions_for_test_tables() const override {
+ return 2;
+ }
+};
+
+TEST_F(TableRangeRebalancingTest, Basic) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ constexpr const char kPerServerReferenceOutput[] =
+ R"***(Per-server replica distribution summary:
+ Statistic | Value
+-----------------------+-----------
+ Minimum Replica Count | 16
+ Maximum Replica Count | 16
+ Average Replica Count | 16)***";
+
+ constexpr const char kReplicaDistributionReferenceOutput[] =
+ R"***(Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+----------
+ Minimum | 0
+ Maximum | 0
+ Average | 0)***";
+
+ // There should be 8 tablet replicas for a particular range per tablet server.
+ constexpr const char kFirstRangeReferencePattern[] =
+ "Range start key: '00000000'\n.+\n.+\n .+ 8\n .+ 8\n .+ 8\n";
+ constexpr const char kSecondRangeReferencePattern[] =
+ "Range start key: 'ff80000000'\n.+\n.+\n .+ 8\n .+ 8\n .+ 8\n";
+
+ vector<string> table_names;
+ NO_FATALS(Prepare({}, {}, {}, kEmptySet, &table_names));
+ ASSERT_EQ(1, table_names.size());
+ {
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--enable_range_rebalancing",
+ Substitute("--tables=$0", table_names.front()),
+ };
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ }
+ {
+ const vector<string> report_tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--enable_range_rebalancing",
+ Substitute("--tables=$0", table_names.front()),
+ "--report_only",
+ "--output_replica_distribution_details",
+ };
+ string out;
+ string err;
+ const Status s = RunKuduTool(report_tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, kPerServerReferenceOutput);
+ ASSERT_STR_CONTAINS(out, kReplicaDistributionReferenceOutput);
+ ASSERT_STR_MATCHES(out, kFirstRangeReferencePattern);
+ ASSERT_STR_MATCHES(out, kSecondRangeReferencePattern);
+ }
+}
+
} // namespace tools
} // namespace kudu
diff --git a/src/kudu/tools/rebalancer_tool.cc b/src/kudu/tools/rebalancer_tool.cc
index f08526be4..a9d50ed50 100644
--- a/src/kudu/tools/rebalancer_tool.cc
+++ b/src/kudu/tools/rebalancer_tool.cc
@@ -22,12 +22,15 @@
#include <functional>
#include <iostream>
#include <iterator>
+#include <limits>
#include <map>
#include <memory>
#include <numeric>
#include <random>
#include <set>
#include <string>
+#include <tuple>
+#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
@@ -493,6 +496,15 @@ Status RebalancerTool::PrintLocationBalanceStats(const string& location,
out << "--------------------------------------------------" << endl;
}
+ // Build dictionary to resolve tablet server UUID into its RPC address.
+ unordered_map<string, string> tserver_endpoints;
+ {
+ const auto& tserver_summaries = raw_info.tserver_summaries;
+ for (const auto& summary : tserver_summaries) {
+ tserver_endpoints.emplace(summary.uuid, summary.address);
+ }
+ }
+
// Per-server replica distribution stats.
{
out << "Per-server replica distribution summary:" << endl;
@@ -521,17 +533,10 @@ Status RebalancerTool::PrintLocationBalanceStats(const string& location,
out << endl;
if (config_.output_replica_distribution_details) {
- const auto& tserver_summaries = raw_info.tserver_summaries;
- unordered_map<string, string> tserver_endpoints;
- for (const auto& summary : tserver_summaries) {
- tserver_endpoints.emplace(summary.uuid, summary.address);
- }
-
out << "Per-server replica distribution details:" << endl;
DataTable servers_info({ "UUID", "Address", "Replica Count" });
- for (const auto& elem : servers_load_info) {
- const auto& id = elem.second;
- servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
+ for (const auto& [load, id] : servers_load_info) {
+ servers_info.AddRow({ id, tserver_endpoints[id], to_string(load) });
}
RETURN_NOT_OK(servers_info.PrintTo(out));
out << endl;
@@ -568,24 +573,99 @@ Status RebalancerTool::PrintLocationBalanceStats(const string& location,
for (const auto& summary : table_summaries) {
table_info.emplace(summary.id, &summary);
}
- out << "Per-table replica distribution details:" << endl;
- DataTable skew(
- { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
- for (const auto& elem : table_skew_info) {
- const auto& table_id = elem.second.table_id;
- const auto it = table_info.find(table_id);
- const auto* table_summary =
- (it == table_info.end()) ? nullptr : it->second;
- const auto& table_name = table_summary ? table_summary->name : "";
- const auto total_replica_count = table_summary
- ? table_summary->replication_factor * table_summary->TotalTablets()
- : 0;
- skew.AddRow({ table_id,
- to_string(total_replica_count),
- to_string(elem.first),
- table_name });
+ if (config_.enable_range_rebalancing) {
+ out << "Per-range replica distribution details for tables" << endl;
+
+ // Build mapping {table_id, tag} --> per-server replica count map.
+ // Using ordered dictionary since it's targeted for printing later.
+ map<pair<string, string>, map<string, size_t>> range_dist_stats;
+ for (const auto& [_, balance_info] : table_skew_info) {
+ const auto& table_id = balance_info.table_id;
+ const auto& tag = balance_info.tag;
+ auto it = range_dist_stats.emplace(
+ std::make_pair(table_id, tag), map<string, size_t>{});
+ const auto& server_info = balance_info.servers_by_replica_count;
+ for (const auto& [count, server_uuid] : server_info) {
+ auto count_it = it.first->second.emplace(server_uuid, 0).first;
+ count_it->second += count;
+ }
+ }
+
+ // Build the mapping for the per-range skew summary table, i.e.
+ // {tablet_id, tag} --> {num_of_replicas, per_server_replica_skew}.
+ map<pair<string, string>, pair<size_t, size_t>> range_skew_stats;
+ for (const auto& [table_range, per_server_stats] : range_dist_stats) {
+ size_t total_count = 0;
+ size_t min_per_server_count = std::numeric_limits<size_t>::max();
+ size_t max_per_server_count = std::numeric_limits<size_t>::min();
+ for (const auto& [server_uuid, replica_count] : per_server_stats) {
+ total_count += replica_count;
+ if (replica_count > max_per_server_count) {
+ max_per_server_count = replica_count;
+ }
+ if (replica_count < min_per_server_count) {
+ min_per_server_count = replica_count;
+ }
+ }
+ size_t skew = max_per_server_count - min_per_server_count;
+ range_skew_stats.emplace(table_range, std::make_pair(total_count, skew));
+ }
+
+ string prev_table_id;
+ for (const auto& [table_info, per_server_stats] : range_dist_stats) {
+ const auto& table_id = table_info.first;
+ const auto& table_range = table_info.second;
+ if (prev_table_id != table_id) {
+ prev_table_id = table_id;
+ out << endl << "Table: " << table_id << endl << endl;
+ out << "Number of tablet replicas at servers for each range" << endl;
+ DataTable range_skew_summary_table(
+ { "Max Skew", "Total Count", "Range Start Key" });
+ const auto it_begin = range_skew_stats.find(table_info);
+ for (auto it = it_begin; it != range_skew_stats.end(); ++it) {
+ const auto& cur_table_id = it->first.first;
+ if (cur_table_id != table_id) {
+ break;
+ }
+ const auto& range = it->first.second;
+ const auto replica_count = it->second.first;
+ const auto replica_skew = it->second.second;
+ range_skew_summary_table.AddRow(
+ { to_string(replica_skew), to_string(replica_count), range });
+ }
+ RETURN_NOT_OK(range_skew_summary_table.PrintTo(out));
+ out << endl;
+ }
+ out << "Range start key: '" << table_range << "'" << endl;
+ DataTable skew_table({ "UUID", "Server address", "Replica Count" });
+ for (const auto& stat : per_server_stats) {
+ const auto& srv_uuid = stat.first;
+ const auto& srv_address = FindOrDie(tserver_endpoints, srv_uuid);
+ skew_table.AddRow({ srv_uuid, srv_address, to_string(stat.second) });
+ }
+ RETURN_NOT_OK(skew_table.PrintTo(out));
+ out << endl;
+ }
+ } else {
+ out << "Per-table replica distribution details:" << endl;
+ DataTable skew_table(
+ { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
+ for (const auto& [skew, balance_info] : table_skew_info) {
+ const auto& table_id = balance_info.table_id;
+ const auto it = table_info.find(table_id);
+ const auto* table_summary =
+ (it == table_info.end()) ? nullptr : it->second;
+ const auto& table_name = table_summary ? table_summary->name : "";
+ const auto total_replica_count = table_summary
+ ? table_summary->replication_factor * table_summary->TotalTablets()
+ : 0;
+ skew_table.AddRow({ table_id,
+ to_string(total_replica_count),
+ to_string(skew),
+ table_name });
+ }
+ RETURN_NOT_OK(skew_table.PrintTo(out));
}
- RETURN_NOT_OK(skew.PrintTo(out));
out << endl;
}
}
@@ -1107,7 +1187,7 @@ Status RebalancerTool::AlgoBasedRunner::GetNextMovesImpl(
});
for (const auto& move : moves) {
vector<string> tablet_ids;
- FindReplicas(move, raw_info, &tablet_ids);
+ rebalancer_->FindReplicas(move, raw_info, &tablet_ids);
if (!loc) {
// In case of cross-location (a.k.a. inter-location) rebalancing it is
// necessary to make sure the majority of replicas would not end up
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index 77bff06c8..b0def5365 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -138,6 +138,9 @@ DEFINE_bool(disable_intra_location_rebalancing, false,
"replica distribution within each location. "
"This setting is applicable to multi-location clusters only.");
+DEFINE_bool(enable_range_rebalancing, false,
+ "Whether to enable table range rebalancing");
+
DEFINE_bool(move_replicas_from_ignored_tservers, false,
"Whether to move replicas from the specified 'ignored_tservers' to other "
"servers when the source tablet server is healthy. "
@@ -320,6 +323,12 @@ Status RunRebalance(const RunnerContext& context) {
const vector<string> table_filters =
Split(FLAGS_tables, ",", strings::SkipEmpty());
+ if (FLAGS_enable_range_rebalancing && table_filters.size() != 1) {
+ return Status::NotSupported(
+ "range rebalancing is currently implemented for a single table only: "
+ "use '--tables' to specify a table for range rebalancing");
+ }
+
// Evaluate --move_single_replicas flag: decide whether enable to disable
// moving of single-replica tablets based on the reported version of the
// Kudu components.
@@ -341,7 +350,8 @@ Status RunRebalance(const RunnerContext& context) {
!FLAGS_disable_intra_location_rebalancing,
FLAGS_load_imbalance_threshold,
FLAGS_force_rebalance_replicas_on_maintenance_tservers,
- FLAGS_intra_location_rebalancing_concurrency));
+ FLAGS_intra_location_rebalancing_concurrency,
+ FLAGS_enable_range_rebalancing));
// Print info on pre-rebalance distribution of replicas.
RETURN_NOT_OK(rebalancer.PrintStats(cout));
@@ -443,6 +453,7 @@ unique_ptr<Mode> BuildClusterMode() {
.AddOptionalParameter("move_replicas_from_ignored_tservers")
.AddOptionalParameter("move_single_replicas")
.AddOptionalParameter("output_replica_distribution_details")
+ .AddOptionalParameter("enable_range_rebalancing")
.AddOptionalParameter("report_only")
.AddOptionalParameter("tables")
.Build();