You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/10/29 23:09:34 UTC
[3/4] kudu git commit: [rebalancer] location-aware rebalancer (part
3/n)
[rebalancer] location-aware rebalancer (part 3/n)
Some refactoring on the rebalancer code:
* Separated rebalancer-relevant information from KsckResults into
a dedicated ClusterRawInfo structure.
* Added ClusterInfo structure; ClusterBalanceInfo is aggregated
into the new ClusterInfo structure as a sub-field.
Change-Id: Ie89f1958238dbc28b44111038d4b82f49f824ca9
Reviewed-on: http://gerrit.cloudera.org:8080/11744
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/43161e5a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/43161e5a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/43161e5a
Branch: refs/heads/master
Commit: 43161e5aa75764a7f1748f85e0500fc454a301d8
Parents: fd50723
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Oct 19 16:22:10 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Mon Oct 29 23:05:42 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/rebalance-test.cc | 26 +++++------
src/kudu/tools/rebalance_algo-test.cc | 49 ++++++++++----------
src/kudu/tools/rebalance_algo.cc | 38 ++++++---------
src/kudu/tools/rebalance_algo.h | 25 ++++++----
src/kudu/tools/rebalancer.cc | 74 ++++++++++++++++++------------
src/kudu/tools/rebalancer.h | 31 ++++++++-----
6 files changed, 135 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/43161e5a/src/kudu/tools/rebalance-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance-test.cc b/src/kudu/tools/rebalance-test.cc
index ec59f55..badac31 100644
--- a/src/kudu/tools/rebalance-test.cc
+++ b/src/kudu/tools/rebalance-test.cc
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/tools/rebalancer.h"
-
#include <algorithm>
#include <cstdint>
#include <iostream>
@@ -31,6 +29,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tools/ksck_results.h"
#include "kudu/tools/rebalance_algo.h"
+#include "kudu/tools/rebalancer.h"
#include "kudu/util/test_macros.h"
using std::inserter;
@@ -83,10 +82,10 @@ struct KsckResultsTestConfig {
ClusterBalanceInfo ref_balance_info;
};
-KsckResults GenerateKsckResults(KsckResultsInput input) {
- KsckResults results;
+ClusterRawInfo GenerateRawClusterInfo(const KsckResultsInput& input) {
+ ClusterRawInfo raw_info;
{
- vector<KsckServerHealthSummary>& summaries = results.tserver_summaries;
+ vector<KsckServerHealthSummary>& summaries = raw_info.tserver_summaries;
for (const auto& summary_input : input.tserver_summaries) {
KsckServerHealthSummary summary;
summary.uuid = summary_input.uuid;
@@ -94,7 +93,7 @@ KsckResults GenerateKsckResults(KsckResultsInput input) {
}
}
{
- vector<KsckTabletSummary>& summaries = results.tablet_summaries;
+ vector<KsckTabletSummary>& summaries = raw_info.tablet_summaries;
for (const auto& summary_input : input.tablet_summaries) {
KsckTabletSummary summary;
summary.id = summary_input.id;
@@ -110,7 +109,7 @@ KsckResults GenerateKsckResults(KsckResultsInput input) {
}
}
{
- vector<KsckTableSummary>& summaries = results.table_summaries;
+ vector<KsckTableSummary>& summaries = raw_info.table_summaries;
for (const auto& summary_input : input.table_summaries) {
KsckTableSummary summary;
summary.id = summary_input.id;
@@ -118,7 +117,7 @@ KsckResults GenerateKsckResults(KsckResultsInput input) {
summaries.emplace_back(std::move(summary));
}
}
- return results;
+ return raw_info;
}
// The order of the key-value pairs whose keys compare equivalent is the order
@@ -216,13 +215,14 @@ class KsckResultsToClusterBalanceInfoTest : public ::testing::Test {
for (auto idx = 0; idx < test_configs.size(); ++idx) {
SCOPED_TRACE(Substitute("test config index: $0", idx));
const auto& cfg = test_configs[idx];
- auto ksck_results = GenerateKsckResults(cfg.input);
+ auto raw_info = GenerateRawClusterInfo(cfg.input);
Rebalancer rebalancer(rebalancer_cfg);
- ClusterBalanceInfo cbi;
- ASSERT_OK(rebalancer.KsckResultsToClusterBalanceInfo(
- ksck_results, Rebalancer::MovesInProgress(), &cbi));
- ASSERT_EQ(cfg.ref_balance_info, cbi);
+ ClusterInfo ci;
+ ASSERT_OK(rebalancer.BuildClusterInfo(
+ raw_info, Rebalancer::MovesInProgress(), &ci));
+
+ ASSERT_EQ(cfg.ref_balance_info, ci.balance);
}
}
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/43161e5a/src/kudu/tools/rebalance_algo-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo-test.cc b/src/kudu/tools/rebalance_algo-test.cc
index cd6adbd..212819f 100644
--- a/src/kudu/tools/rebalance_algo-test.cc
+++ b/src/kudu/tools/rebalance_algo-test.cc
@@ -100,10 +100,10 @@ ostream& operator<<(ostream& o, const TableReplicaMove& move) {
return o;
}
-// Transform the definition of the test cluster into the ClusterBalanceInfo
+// Transform the definition of the test cluster into the ClusterInfo
// that is consumed by the rebalancing algorithm.
-void ClusterConfigToClusterBalanceInfo(const TestClusterConfig& tcc,
- ClusterBalanceInfo* cbi) {
+void ClusterConfigToClusterInfo(const TestClusterConfig& tcc,
+ ClusterInfo* cluster_info) {
// First verify that the configuration of the test cluster is valid.
set<string> table_ids;
for (const auto& table_replica_info : tcc.table_replicas) {
@@ -118,7 +118,8 @@ void ClusterConfigToClusterBalanceInfo(const TestClusterConfig& tcc,
CHECK_EQ(tcc.tserver_uuids.size(), uuids.size());
}
- ClusterBalanceInfo result;
+ ClusterInfo result;
+ auto& balance = result.balance;
for (size_t tserver_idx = 0; tserver_idx < tcc.tserver_uuids.size();
++tserver_idx) {
// Total replica count at the tablet server.
@@ -126,10 +127,11 @@ void ClusterConfigToClusterBalanceInfo(const TestClusterConfig& tcc,
for (const auto& table_replica_info: tcc.table_replicas) {
count += table_replica_info.num_replicas_by_server[tserver_idx];
}
- result.servers_by_total_replica_count.emplace(count, tcc.tserver_uuids[tserver_idx]);
+ balance.servers_by_total_replica_count.emplace(
+ count, tcc.tserver_uuids[tserver_idx]);
}
- auto& table_info_by_skew = result.table_info_by_skew;
+ auto& table_info_by_skew = balance.table_info_by_skew;
for (size_t table_idx = 0; table_idx < tcc.table_replicas.size(); ++table_idx) {
// Replicas of the current table per tablet server.
const vector<size_t>& replicas_count =
@@ -145,17 +147,18 @@ void ClusterConfigToClusterBalanceInfo(const TestClusterConfig& tcc,
CHECK_GE(max_count, min_count);
table_info_by_skew.emplace(max_count - min_count, std::move(info));
}
- *cbi = std::move(result);
+
+ *cluster_info = std::move(result);
}
void VerifyRebalancingMoves(const TestClusterConfig& cfg) {
vector<TableReplicaMove> moves;
{
- ClusterBalanceInfo cbi;
- ClusterConfigToClusterBalanceInfo(cfg, &cbi);
+ ClusterInfo ci;
+ ClusterConfigToClusterInfo(cfg, &ci);
TwoDimensionalGreedyAlgo algo(
TwoDimensionalGreedyAlgo::EqualSkewOption::PICK_FIRST);
- ASSERT_OK(algo.GetNextMoves(std::move(cbi), 0, &moves));
+ ASSERT_OK(algo.GetNextMoves(ci, 0, &moves));
}
EXPECT_EQ(cfg.expected_moves, moves);
}
@@ -190,9 +193,9 @@ string TestClusterConfigToDebugString(const TestClusterConfig& cfg) {
}
// Test the behavior of the algorithm when no input information is given.
-TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMoves) {
+TEST(RebalanceAlgoUnitTest, EmptyClusterInfoGetNextMoves) {
vector<TableReplicaMove> moves;
- const ClusterBalanceInfo info;
+ const ClusterInfo info;
ASSERT_OK(TwoDimensionalGreedyAlgo().GetNextMoves(info, 0, &moves));
EXPECT_TRUE(moves.empty());
}
@@ -202,14 +205,14 @@ TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMoves) {
TEST(RebalanceAlgoUnitTest, NoTableSkewInClusterBalanceInfoGetNextMoves) {
{
vector<TableReplicaMove> moves;
- const ClusterBalanceInfo info = { {}, { { 0, "ts_0" } } };
+ const ClusterInfo info = { { {}, { { 0, "ts_0" } } } };
ASSERT_OK(TwoDimensionalGreedyAlgo().GetNextMoves(info, 0, &moves));
EXPECT_TRUE(moves.empty());
}
{
vector<TableReplicaMove> moves;
- const ClusterBalanceInfo info = { {}, { { 1, "ts_0" }, } };
+ const ClusterInfo info = { { {}, { { 1, "ts_0" }, } } };
const auto s = TwoDimensionalGreedyAlgo().GetNextMoves(info, 0, &moves);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(),
@@ -222,7 +225,7 @@ TEST(RebalanceAlgoUnitTest, NoTableSkewInClusterBalanceInfoGetNextMoves) {
// GetNextMove() when no input information is given.
TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMove) {
boost::optional<TableReplicaMove> move;
- const ClusterBalanceInfo info;
+ const ClusterInfo info;
const auto s = TwoDimensionalGreedyAlgo().GetNextMove(info, &move);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
EXPECT_EQ(boost::none, move);
@@ -668,8 +671,8 @@ TEST(RebalanceAlgoUnitTest, ManyMoves) {
};
constexpr size_t kExpectedMovesNum = 200;
- ClusterBalanceInfo cbi;
- ClusterConfigToClusterBalanceInfo(kConfig, &cbi);
+ ClusterInfo ci;
+ ClusterConfigToClusterInfo(kConfig, &ci);
vector<TableReplicaMove> ref_moves;
for (size_t i = 0; i < kExpectedMovesNum; ++i) {
@@ -683,7 +686,7 @@ TEST(RebalanceAlgoUnitTest, ManyMoves) {
TwoDimensionalGreedyAlgo algo(
TwoDimensionalGreedyAlgo::EqualSkewOption::PICK_FIRST);
vector<TableReplicaMove> moves;
- ASSERT_OK(algo.GetNextMoves(cbi, 0, &moves));
+ ASSERT_OK(algo.GetNextMoves(ci, 0, &moves));
EXPECT_EQ(ref_moves, moves);
}
@@ -725,8 +728,8 @@ TEST(RebalanceAlgoUnitTest, RandomizedTest) {
// Make sure the rebalancing algorithm can balance the config.
{
SCOPED_TRACE(TestClusterConfigToDebugString(cfg));
- ClusterBalanceInfo cbi;
- ClusterConfigToClusterBalanceInfo(cfg, &cbi);
+ ClusterInfo ci;
+ ClusterConfigToClusterInfo(cfg, &ci);
TwoDimensionalGreedyAlgo algo;
boost::optional<TableReplicaMove> move;
// Set a generous upper bound on the number of moves allowed before we
@@ -734,9 +737,9 @@ TEST(RebalanceAlgoUnitTest, RandomizedTest) {
// We shouldn't need to do more moves than there are replicas.
int num_moves_ub = num_tservers * num_tables * max_replicas_per_table_and_tserver;
int num_moves = 0;
- while (!IsBalanced(cbi)) {
- ASSERT_OK(algo.GetNextMove(cbi, &move));
- ASSERT_OK(TwoDimensionalGreedyAlgo::ApplyMove(*move, &cbi));
+ while (!IsBalanced(ci.balance)) {
+ ASSERT_OK(algo.GetNextMove(ci, &move));
+ ASSERT_OK(TwoDimensionalGreedyAlgo::ApplyMove(*move, &ci.balance));
ASSERT_GE(num_moves_ub, ++num_moves) << "Too many moves! The algorithm is likely stuck";
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/43161e5a/src/kudu/tools/rebalance_algo.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.cc b/src/kudu/tools/rebalance_algo.cc
index ca05eb8..c788761 100644
--- a/src/kudu/tools/rebalance_algo.cc
+++ b/src/kudu/tools/rebalance_algo.cc
@@ -21,12 +21,8 @@
#include <iostream>
#include <iterator>
#include <limits>
-#include <map>
-#include <memory>
#include <random>
#include <string>
-#include <unordered_map>
-#include <unordered_set>
#include <utility>
#include <vector>
@@ -38,18 +34,12 @@
#include "kudu/util/status.h"
using std::back_inserter;
-using std::cout;
-using std::endl;
-using std::make_pair;
-using std::multimap;
+using std::numeric_limits;
using std::ostringstream;
using std::set_intersection;
-using std::shared_ptr;
using std::shuffle;
using std::sort;
using std::string;
-using std::unordered_map;
-using std::unordered_set;
using std::vector;
using strings::Substitute;
@@ -107,7 +97,7 @@ Status MoveOneReplica(const string& src,
}
} // anonymous namespace
-Status RebalancingAlgo::GetNextMoves(const ClusterBalanceInfo& cluster_info,
+Status RebalancingAlgo::GetNextMoves(const ClusterInfo& cluster_info,
int max_moves_num,
vector<TableReplicaMove>* moves) {
DCHECK_LE(0, max_moves_num);
@@ -115,15 +105,16 @@ Status RebalancingAlgo::GetNextMoves(const ClusterBalanceInfo& cluster_info,
// Value of '0' is a shortcut for 'the possible maximum'.
if (max_moves_num == 0) {
- max_moves_num = std::numeric_limits<decltype(max_moves_num)>::max();
+ max_moves_num = numeric_limits<decltype(max_moves_num)>::max();
}
moves->clear();
- if (cluster_info.table_info_by_skew.empty()) {
+ const auto& balance = cluster_info.balance;
+ if (balance.table_info_by_skew.empty()) {
// Check for the consistency of the 'cluster_info' parameter: if no
// information is given on the table skew, table count for all the tablet
// servers should be 0.
- for (const auto& elem : cluster_info.servers_by_total_replica_count) {
+ for (const auto& elem : balance.servers_by_total_replica_count) {
if (elem.first != 0) {
return Status::InvalidArgument(Substitute(
"non-zero table count ($0) on tablet server ($1) while no table "
@@ -135,7 +126,7 @@ Status RebalancingAlgo::GetNextMoves(const ClusterBalanceInfo& cluster_info,
}
// Copy cluster_info so we can apply moves to the copy.
- ClusterBalanceInfo info(cluster_info);
+ ClusterInfo info(cluster_info);
for (decltype(max_moves_num) i = 0; i < max_moves_num; ++i) {
boost::optional<TableReplicaMove> move;
RETURN_NOT_OK(GetNextMove(info, &move));
@@ -143,16 +134,16 @@ Status RebalancingAlgo::GetNextMoves(const ClusterBalanceInfo& cluster_info,
// No replicas to move.
break;
}
- RETURN_NOT_OK(ApplyMove(*move, &info));
+ RETURN_NOT_OK(ApplyMove(*move, &info.balance));
moves->push_back(std::move(*move));
}
return Status::OK();
}
Status RebalancingAlgo::ApplyMove(const TableReplicaMove& move,
- ClusterBalanceInfo* cluster_info) {
+ ClusterBalanceInfo* balance_info) {
// Copy cluster_info so we can apply moves to the copy.
- ClusterBalanceInfo info(*DCHECK_NOTNULL(cluster_info));
+ ClusterBalanceInfo info(*DCHECK_NOTNULL(balance_info));
// Update the total counts.
RETURN_NOT_OK_PREPEND(
@@ -190,7 +181,7 @@ Status RebalancingAlgo::ApplyMove(const TableReplicaMove& move,
const int32_t skew = max_count - min_count;
table_info_by_skew.emplace(skew, std::move(table_info));
- std::swap(*cluster_info, info);
+ *balance_info = std::move(info);
return Status::OK();
}
@@ -202,23 +193,24 @@ TwoDimensionalGreedyAlgo::TwoDimensionalGreedyAlgo(EqualSkewOption opt)
}
Status TwoDimensionalGreedyAlgo::GetNextMove(
- const ClusterBalanceInfo& cluster_info,
+ const ClusterInfo& cluster_info,
boost::optional<TableReplicaMove>* move) {
DCHECK(move);
// Set the output to none: this fits the short-circuit cases when there is
// an issue with the parameters or there aren't any moves to return.
*move = boost::none;
+ const auto& balance_info = cluster_info.balance;
// Due to the nature of the table_info_by_skew container, the very last
// range represents the most unbalanced tables.
- const auto& table_info_by_skew = cluster_info.table_info_by_skew;
+ const auto& table_info_by_skew = balance_info.table_info_by_skew;
if (table_info_by_skew.empty()) {
return Status::InvalidArgument("no table balance information");
}
const auto max_table_skew = table_info_by_skew.rbegin()->first;
const auto& servers_by_total_replica_count =
- cluster_info.servers_by_total_replica_count;
+ balance_info.servers_by_total_replica_count;
if (servers_by_total_replica_count.empty()) {
return Status::InvalidArgument("no per-server replica count information");
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/43161e5a/src/kudu/tools/rebalance_algo.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/tools/rebalance_algo.h
index e099051..a32fdc7 100644
--- a/src/kudu/tools/rebalance_algo.h
+++ b/src/kudu/tools/rebalance_algo.h
@@ -69,6 +69,8 @@ struct ClusterBalanceInfo {
ServersByCountMap servers_by_total_replica_count;
};
+// Locality information for a cluster: distribution of tablet servers among
+// locations.
struct ClusterLocalityInfo {
// Location-related information: distribution of tablet servers by locations.
// Mapping 'location' --> 'identifiers of tablet servers in the location'.
@@ -78,6 +80,13 @@ struct ClusterLocalityInfo {
std::unordered_map<std::string, std::string> location_by_ts_id;
};
+// Information on a cluster as input for various rebalancing algorithms.
+// As of now, contains only ClusterBalanceInfo, but ClusterLocalityInfo
+// is to be added once corresponding location-aware algorithms are implemented.
+struct ClusterInfo {
+ ClusterBalanceInfo balance;
+};
+
// A directive to move some replica of a table between two tablet servers.
struct TableReplicaMove {
std::string table_id;
@@ -103,7 +112,7 @@ class RebalancingAlgo {
// is considered balanced.
//
// 'moves' must be non-NULL.
- virtual Status GetNextMoves(const ClusterBalanceInfo& cluster_info,
+ virtual Status GetNextMoves(const ClusterInfo& cluster_info,
int max_moves_num,
std::vector<TableReplicaMove>* moves);
protected:
@@ -111,15 +120,13 @@ class RebalancingAlgo {
// the 'move' output parameter is set to 'boost::none'.
//
// 'move' must be non-NULL.
- virtual Status GetNextMove(const ClusterBalanceInfo& cluster_info,
+ virtual Status GetNextMove(const ClusterInfo& cluster_info,
boost::optional<TableReplicaMove>* move) = 0;
- // Update the balance state in 'cluster_info' with the outcome of the move
- // 'move'. 'cluster_info' is an in-out parameter.
- //
- // 'cluster_info' must be non-NULL.
+ // Update the balance state in 'balance_info' with the outcome of the move
+ // 'move'. 'balance_info' is an in-out parameter and must be non-NULL.
static Status ApplyMove(const TableReplicaMove& move,
- ClusterBalanceInfo* cluster_info);
+ ClusterBalanceInfo* balance_info);
};
// A two-dimensional greedy rebalancing algorithm. From among moves that
@@ -140,14 +147,14 @@ class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
explicit TwoDimensionalGreedyAlgo(
EqualSkewOption opt = EqualSkewOption::PICK_RANDOM);
- Status GetNextMove(const ClusterBalanceInfo& cluster_info,
+ Status GetNextMove(const ClusterInfo& cluster_info,
boost::optional<TableReplicaMove>* move) override;
private:
enum class ExtremumType { MAX, MIN, };
FRIEND_TEST(RebalanceAlgoUnitTest, RandomizedTest);
- FRIEND_TEST(RebalanceAlgoUnitTest, EmptyClusterBalanceInfoGetNextMove);
+ FRIEND_TEST(RebalanceAlgoUnitTest, EmptyClusterInfoGetNextMove);
// Compute the intersection of the least or most loaded tablet servers for a
// table with the least or most loaded tablet servers in the cluster:
http://git-wip-us.apache.org/repos/asf/kudu/blob/43161e5a/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index 09af4bd..c3225f2 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -103,9 +103,13 @@ Status Rebalancer::PrintStats(std::ostream& out) {
RETURN_NOT_OK(RefreshKsckResults());
const KsckResults& results = ksck_->results();
- ClusterBalanceInfo cbi;
- RETURN_NOT_OK(KsckResultsToClusterBalanceInfo(results, MovesInProgress(), &cbi));
+ ClusterRawInfo raw_info;
+ RETURN_NOT_OK(KsckResultsToClusterRawInfo(results, &raw_info));
+ ClusterInfo ci;
+ RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+
+ auto& cbi = ci.balance;
// Per-server replica distribution stats.
{
out << "Per-server replica distribution summary:" << endl;
@@ -215,6 +219,12 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
deadline = MonoTime::Now() + MonoDelta::FromSeconds(config_.max_run_time_sec);
}
+ ClusterRawInfo raw_info;
+ RETURN_NOT_OK(KsckResultsToClusterRawInfo(ksck_->results(), &raw_info));
+
+ ClusterInfo ci;
+ RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+
Runner runner(config_.max_moves_per_server, deadline);
RETURN_NOT_OK(runner.Init(config_.master_addresses));
@@ -303,25 +313,28 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
return Status::OK();
}
-// Transform the information on the cluster returned by ksck into
-// ClusterBalanceInfo that could be consumed by the rebalancing algorithm,
-// taking into account pending replica movement operations. The pending
-// operations are evaluated against the state of the cluster in accordance with
-// the ksck results, and if the replica movement operations are still in
-// progress, then they are interpreted as successfully completed. The idea is to
-// prevent the algorithm outputting the same moves again while some of the
-// moves recommended at prior steps are still in progress.
-Status Rebalancer::KsckResultsToClusterBalanceInfo(
+Status Rebalancer::KsckResultsToClusterRawInfo(
const KsckResults& ksck_info,
- const MovesInProgress& moves_in_progress,
- ClusterBalanceInfo* cbi) const {
- DCHECK(cbi);
+ ClusterRawInfo* raw_info) {
+ DCHECK(raw_info);
+
+ raw_info->tserver_summaries = ksck_info.tserver_summaries;
+ raw_info->table_summaries = ksck_info.table_summaries;
+ raw_info->tablet_summaries = ksck_info.tablet_summaries;
+
+ return Status::OK();
+}
+
+Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
+ const MovesInProgress& moves_in_progress,
+ ClusterInfo* info) const {
+ DCHECK(info);
// tserver UUID --> total replica count of all table's tablets at the server
typedef unordered_map<string, int32_t> TableReplicasAtServer;
- // The result table balance information to build.
- ClusterBalanceInfo balance_info;
+ // The result information to build.
+ ClusterInfo result_info;
unordered_map<string, int32_t> tserver_replicas_count;
unordered_map<string, TableReplicasAtServer> table_replicas_info;
@@ -329,14 +342,14 @@ Status Rebalancer::KsckResultsToClusterBalanceInfo(
// Build a set of tables with RF=1 (single replica tables).
unordered_set<string> rf1_tables;
if (!config_.move_rf1_replicas) {
- for (const auto& s : ksck_info.table_summaries) {
+ for (const auto& s : raw_info.table_summaries) {
if (s.replication_factor == 1) {
rf1_tables.emplace(s.id);
}
}
}
- for (const auto& s : ksck_info.tserver_summaries) {
+ for (const auto& s : raw_info.tserver_summaries) {
if (s.health != KsckServerHealth::HEALTHY) {
LOG(INFO) << Substitute("skipping tablet server $0 ($1) because of its "
"non-HEALTHY status ($2)",
@@ -347,7 +360,7 @@ Status Rebalancer::KsckResultsToClusterBalanceInfo(
tserver_replicas_count.emplace(s.uuid, 0);
}
- for (const auto& tablet : ksck_info.tablet_summaries) {
+ for (const auto& tablet : raw_info.tablet_summaries) {
if (!config_.move_rf1_replicas) {
if (rf1_tables.find(tablet.table_id) != rf1_tables.end()) {
LOG(INFO) << Substitute("tablet $0 of table '$0' ($1) has single replica, skipping",
@@ -377,6 +390,7 @@ Status Rebalancer::KsckResultsToClusterBalanceInfo(
if (it_pending_moves != moves_in_progress.end() &&
tablet.result == KsckCheckResult::RECOVERING) {
const auto& move_info = it_pending_moves->second;
+ DCHECK(!move_info.ts_uuid_to.empty());
bool is_target_replica_present = false;
// Verify that the target replica is present in the config.
for (const auto& tr : tablet.replicas) {
@@ -422,13 +436,13 @@ Status Rebalancer::KsckResultsToClusterBalanceInfo(
}
// Populate ClusterBalanceInfo::servers_by_total_replica_count
- auto& servers_by_count = balance_info.servers_by_total_replica_count;
+ auto& servers_by_count = result_info.balance.servers_by_total_replica_count;
for (const auto& elem : tserver_replicas_count) {
servers_by_count.emplace(elem.second, elem.first);
}
// Populate ClusterBalanceInfo::table_info_by_skew
- auto& table_info_by_skew = balance_info.table_info_by_skew;
+ auto& table_info_by_skew = result_info.balance.table_info_by_skew;
for (const auto& elem : table_replicas_info) {
const auto& table_id = elem.first;
int32_t max_count = numeric_limits<int32_t>::min();
@@ -444,11 +458,12 @@ Status Rebalancer::KsckResultsToClusterBalanceInfo(
}
table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
}
- *cbi = std::move(balance_info);
+ *info = std::move(result_info);
return Status::OK();
}
+
// Run one step of the rebalancer. Due to the inherent restrictions of the
// rebalancing engine, no more than one replica per tablet is moved during
// one step of the rebalancing.
@@ -474,16 +489,17 @@ Status Rebalancer::GetNextMoves(const MovesInProgress& moves_in_progress,
const size_t max_moves = config_.max_moves_per_server *
ksck_info.tserver_summaries.size() * 5;
+ ClusterRawInfo raw_info;
+ RETURN_NOT_OK(KsckResultsToClusterRawInfo(ksck_info, &raw_info));
+
replica_moves->clear();
vector<TableReplicaMove> moves;
- {
- ClusterBalanceInfo cbi;
- RETURN_NOT_OK(KsckResultsToClusterBalanceInfo(ksck_info, moves_in_progress, &cbi));
- RETURN_NOT_OK(algo_.GetNextMoves(cbi, max_moves, &moves));
- }
+ ClusterInfo cluster_info;
+ RETURN_NOT_OK(BuildClusterInfo(raw_info, moves_in_progress, &cluster_info));
+ RETURN_NOT_OK(algo_.GetNextMoves(cluster_info, max_moves, &moves));
if (moves.empty()) {
- // No suitable moves were found: the cluster described by 'cbi' is balanced,
- // assuming the pending moves, if any, will succeed.
+ // No suitable moves were found: the cluster is balanced,
+ // assuming all pending moves, if any, will succeed.
return Status::OK();
}
unordered_set<string> tablets_in_move;
http://git-wip-us.apache.org/repos/asf/kudu/blob/43161e5a/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index 3c038aa..2b13a92 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -238,18 +238,27 @@ class Rebalancer {
friend class KsckResultsToClusterBalanceInfoTest;
- // Convert ksck results into cluster balance information suitable for the
- // input of the high-level rebalancing algorithm. The 'moves_in_progress'
- // parameter contains information on the replica moves which have been
- // scheduled by a caller and still in progress: those are considered
- // as successfully completed and applied to the 'ksck_info' when building
- // ClusterBalanceInfo for the specified 'ksck_info' input. The result
- // cluster balance information is output into the 'cbi' parameter. The 'cbi'
- // output parameter cannot be null.
- Status KsckResultsToClusterBalanceInfo(
+ // Convert ksck results into information relevant to rebalancing the cluster.
+ // Basically, 'raw' information is just a sub-set of relevant fields of the
+ // KsckResults structure filtered to contain information only for the
+ // specified location.
+ static Status KsckResultsToClusterRawInfo(
const KsckResults& ksck_info,
- const MovesInProgress& moves_in_progress,
- ClusterBalanceInfo* cbi) const;
+ ClusterRawInfo* raw_info);
+
+ // Convert the 'raw' information about the cluster into information suitable
+ // for the input of the high-level rebalancing algorithm.
+ // The 'moves_in_progress' parameter contains information on the replica moves
+ // which have been scheduled by a caller and still in progress: those are
+ // considered as successfully completed and applied to the 'raw_info' when
+ // building ClusterInfo for the specified 'raw_info' input. The idea
+ // is to prevent the algorithm outputting the same moves again while some
+ // of the moves recommended at prior steps are still in progress.
+ // The result cluster balance information is output into the 'info' parameter.
+ // The 'info' output parameter cannot be null.
+ Status BuildClusterInfo(const ClusterRawInfo& raw_info,
+ const MovesInProgress& moves_in_progress,
+ ClusterInfo* info) const;
// Get next batch of replica moves from the rebalancing algorithm.
// Essentially, it runs ksck against the cluster and feeds the data into the