You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2023/01/12 09:43:12 UTC
[incubator-kvrocks] branch unstable updated: Persist the cluster nodes info after applying the cluster topology (#1219)
This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new f1f7c10a Persist the cluster nodes info after applying the cluster topology (#1219)
f1f7c10a is described below
commit f1f7c10a30dc3387f6ffd77fe3c6efe2f393ed38
Author: hulk <hu...@gmail.com>
AuthorDate: Thu Jan 12 17:43:05 2023 +0800
Persist the cluster nodes info after applying the cluster topology (#1219)
---
src/cluster/cluster.cc | 130 ++++++++++++++++++++---
src/cluster/cluster.h | 4 +
src/commands/redis_cmd.cc | 7 ++
src/config/config.cc | 2 +
src/config/config.h | 1 +
src/server/server.cc | 7 +-
tests/cppunit/cluster_test.cc | 39 +++++++
tests/gocase/integration/cluster/cluster_test.go | 52 +++++++++
8 files changed, 227 insertions(+), 15 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index f89906bc..63c806a6 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -20,8 +20,11 @@
#include "cluster.h"
+#include <config/config_util.h>
+
#include <algorithm>
#include <cstring>
+#include <fstream>
#include <memory>
#include "commands/redis_cmd.h"
@@ -487,8 +490,46 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) {
}
std::string Cluster::GenNodesDescription() {
- // Generate slots info firstly
+ UpdateSlotsInfo();
+
+ auto now = Util::GetTimeStampMS();
+ std::string nodes_desc;
+ for (const auto &item : nodes_) {
+ const std::shared_ptr<ClusterNode> n = item.second;
+
+ std::string node_str;
+ // ID, host, port
+ node_str.append(n->id_ + " ");
+ node_str.append(fmt::format("{}:{}@{} ", n->host_, n->port_, n->port_ + kClusterPortIncr));
+
+ // Flags
+ if (n->id_ == myid_) node_str.append("myself,");
+ if (n->role_ == kClusterMaster) {
+ node_str.append("master - ");
+ } else {
+ node_str.append("slave " + n->master_id_ + " ");
+ }
+
+ // Ping sent, pong received, config epoch, link status
+ node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));
+
+ if (n->role_ == kClusterMaster && n->slots_info_.size() > 0) {
+ node_str.append(" " + n->slots_info_);
+ }
+
+ nodes_desc.append(node_str + "\n");
+ }
+ return nodes_desc;
+}
+
+void Cluster::UpdateSlotsInfo() {
int start = -1;
+ // reset the previous slots info
+ for (const auto &item : nodes_) {
+ const std::shared_ptr<ClusterNode> &n = item.second;
+ n->slots_info_.clear();
+ }
+
std::shared_ptr<ClusterNode> n = nullptr;
for (int i = 0; i <= kClusterSlots; i++) {
// Find start node and slot id
@@ -511,37 +552,98 @@ std::string Cluster::GenNodesDescription() {
}
}
- std::string nodes_desc;
for (const auto &item : nodes_) {
const std::shared_ptr<ClusterNode> n = item.second;
+ if (n->slots_info_.size() > 0) n->slots_info_.pop_back(); // Remove last space
+ }
+}
+
+std::string Cluster::GenNodesInfo() {
+ UpdateSlotsInfo();
+ std::string nodes_info;
+ for (const auto &item : nodes_) {
+ const std::shared_ptr<ClusterNode> &n = item.second;
std::string node_str;
- // ID, host, port
+ node_str.append("node ");
+ // ID
node_str.append(n->id_ + " ");
- node_str.append(fmt::format("{}:{}@{} ", n->host_, n->port_, n->port_ + kClusterPortIncr));
+ // Host + Port
+ node_str.append(fmt::format("{} {} ", n->host_, n->port_));
- // Flags
- if (n->id_ == myid_) node_str.append("myself,");
+ // Role
if (n->role_ == kClusterMaster) {
node_str.append("master - ");
} else {
node_str.append("slave " + n->master_id_ + " ");
}
- // Ping sent, pong received, config epoch, link status
- auto now = Util::GetTimeStampMS();
- node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));
-
// Slots
- if (n->slots_info_.size() > 0) n->slots_info_.pop_back(); // Trim space
if (n->role_ == kClusterMaster && n->slots_info_.size() > 0) {
node_str.append(" " + n->slots_info_);
}
- n->slots_info_.clear(); // Reset
+ nodes_info.append(node_str + "\n");
+ }
+ return nodes_info;
+}
- nodes_desc.append(node_str + "\n");
+Status Cluster::DumpClusterNodes(const std::string &file) {
+ // Parse and validate the cluster nodes string before dumping into file
+ std::string tmp_path = file + ".tmp";
+ remove(tmp_path.data());
+ std::ofstream output_file(tmp_path, std::ios::out);
+ output_file << fmt::format("version {}\n", version_);
+ output_file << fmt::format("id {}\n", myid_);
+ output_file << GenNodesInfo();
+ output_file.close();
+ if (rename(tmp_path.data(), file.data()) < 0) {
+ return {Status::NotOK, fmt::format("rename file encounter error: {}", strerror(errno))};
}
- return nodes_desc;
+ return Status::OK();
+}
+
+Status Cluster::LoadClusterNodes(const std::string &file_path) {
+ if (rocksdb::Env::Default()->FileExists(file_path).IsNotFound()) {
+ LOG(INFO) << fmt::format("The cluster nodes file {} is not found. Use CLUSTERX subcommands to specify it.",
+ file_path);
+ return Status::OK();
+ }
+
+ std::ifstream file;
+ file.open(file_path);
+ if (!file.is_open()) {
+ return {Status::NotOK, fmt::format("error opening the file '{}': {}", file_path, strerror(errno))};
+ }
+
+ int64_t version = -1;
+ std::string id, nodesInfo;
+ std::string line;
+ while (!file.eof()) {
+ std::getline(file, line);
+
+ auto parsed = ParseConfigLine(line);
+ if (!parsed) return parsed.ToStatus().Prefixed("malformed line");
+ if (parsed->first.empty() || parsed->second.empty()) continue;
+
+ auto key = parsed->first;
+ if (key == "version") {
+ auto parse_result = ParseInt<int64_t>(parsed->second, 10);
+ if (!parse_result) {
+ return {Status::NotOK, errInvalidClusterVersion};
+ }
+ version = *parse_result;
+ } else if (key == "id") {
+ id = parsed->second;
+ if (id.length() != kClusterNodeIdLen) {
+ return {Status::NotOK, errInvalidNodeID};
+ }
+ } else if (key == "node") {
+ nodesInfo.append(parsed->second + "\n");
+ } else {
+ return {Status::NotOK, fmt::format("unknown key: {}", key)};
+ }
+ }
+ return SetClusterNodes(nodesInfo, version, false);
}
Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index d947d8c3..da551d19 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -94,11 +94,15 @@ class Cluster {
Status MigrateSlot(int slot, const std::string &dst_node_id);
Status ImportSlot(Redis::Connection *conn, int slot, int state);
std::string GetMyId() const { return myid_; }
+ Status DumpClusterNodes(const std::string &file);
+ Status LoadClusterNodes(const std::string &file_path);
static bool SubCommandIsExecExclusive(const std::string &subcommand);
private:
std::string GenNodesDescription();
+ std::string GenNodesInfo();
+ void UpdateSlotsInfo();
SlotInfo GenSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n);
Status ParseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes);
diff --git a/src/commands/redis_cmd.cc b/src/commands/redis_cmd.cc
index 4f72c859..a66b13c9 100644
--- a/src/commands/redis_cmd.cc
+++ b/src/commands/redis_cmd.cc
@@ -5270,9 +5270,11 @@ class CommandClusterX : public Commander {
return Status::OK();
}
+ bool need_persist_nodes_info = false;
if (subcommand_ == "setnodes") {
Status s = svr->cluster_->SetClusterNodes(nodes_str_, set_version_, force_);
if (s.IsOK()) {
+ need_persist_nodes_info = true;
*output = Redis::SimpleString("OK");
} else {
*output = Redis::Error(s.Msg());
@@ -5280,6 +5282,7 @@ class CommandClusterX : public Commander {
} else if (subcommand_ == "setnodeid") {
Status s = svr->cluster_->SetNodeId(args_[2]);
if (s.IsOK()) {
+ need_persist_nodes_info = true;
*output = Redis::SimpleString("OK");
} else {
*output = Redis::Error(s.Msg());
@@ -5287,6 +5290,7 @@ class CommandClusterX : public Commander {
} else if (subcommand_ == "setslot") {
Status s = svr->cluster_->SetSlot(slot_id_, args_[4], set_version_);
if (s.IsOK()) {
+ need_persist_nodes_info = true;
*output = Redis::SimpleString("OK");
} else {
*output = Redis::Error(s.Msg());
@@ -5304,6 +5308,9 @@ class CommandClusterX : public Commander {
} else {
*output = Redis::Error("Invalid cluster command options");
}
+ if (need_persist_nodes_info) {
+ return svr->cluster_->DumpClusterNodes(svr->GetConfig()->NodesFilePath());
+ }
return Status::OK();
}
diff --git a/src/config/config.cc b/src/config/config.cc
index e0cf0f8c..5b9dce00 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -592,6 +592,8 @@ void Config::initFieldCallback() {
}
}
+std::string Config::NodesFilePath() { return dir + "/nodes.conf"; }
+
void Config::SetMaster(const std::string &host, uint32_t port) {
master_host = host;
master_port = port;
diff --git a/src/config/config.h b/src/config/config.h
index 2871f551..703b08aa 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -194,6 +194,7 @@ struct Config {
mutable std::mutex backup_mu_;
public:
+ std::string NodesFilePath();
Status Rewrite();
Status Load(const CLIOptions &path);
void Get(const std::string &key, std::vector<std::string> *values);
diff --git a/src/server/server.cc b/src/server/server.cc
index bb56b19e..6c265451 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -137,12 +137,17 @@ Status Server::Start() {
}
if (config_->cluster_enabled) {
+ auto s = cluster_->LoadClusterNodes(config_->NodesFilePath());
+ if (!s.IsOK()) {
+ LOG(ERROR) << "Failed to load cluster nodes info: " << s.Msg();
+ return Status(Status::NotOK, s.Msg());
+ }
// Create objects used for slot migration
slot_migrate_ =
std::make_unique<SlotMigrate>(this, config_->migrate_speed, config_->pipeline_size, config_->sequence_gap);
slot_import_ = new SlotImport(this);
// Create migrating thread
- auto s = slot_migrate_->CreateMigrateHandleThread();
+ s = slot_migrate_->CreateMigrateHandleThread();
if (!s.IsOK()) {
LOG(ERROR) << "Failed to create migration thread, Err: " << s.Msg();
return Status(Status::NotOK);
diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc
index b6e256d2..865122bf 100644
--- a/tests/cppunit/cluster_test.cc
+++ b/tests/cppunit/cluster_test.cc
@@ -156,3 +156,42 @@ TEST(Cluster, CluseterGetSlotInfo) {
ASSERT_TRUE(info.nodes[0].port == 30002);
ASSERT_TRUE(info.nodes[1].id == "07c37dfeb235213a872192d90877d0cd55635b91");
}
+
+TEST(Cluster, TestDumpAndLoadClusterNodesInfo) {
+ int64_t version = 2;
+ const std::string nodes =
+ "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
+ "slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1\n"
+ "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
+ "master - 5461-10922\n"
+ "17ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30003 master - 10923-16383";
+ Cluster cluster(nullptr, {"127.0.0.1"}, 30002);
+ Status s = cluster.SetClusterNodes(nodes, version, false);
+ ASSERT_TRUE(s.IsOK());
+
+ std::string nodes_filename = "nodes.conf";
+ s = cluster.DumpClusterNodes(nodes_filename);
+ ASSERT_TRUE(s.IsOK());
+ Cluster new_cluster(nullptr, {"127.0.0.1"}, 30002);
+ s = new_cluster.LoadClusterNodes(nodes_filename);
+ ASSERT_TRUE(s.IsOK());
+ ASSERT_EQ(version, new_cluster.GetVersion());
+ std::vector<SlotInfo> slots_infos;
+ s = new_cluster.GetSlotsInfo(&slots_infos);
+ ASSERT_TRUE(s.IsOK());
+ ASSERT_EQ(2, slots_infos.size());
+ SlotInfo slot0_info = slots_infos[0];
+ ASSERT_EQ(5461, slot0_info.start);
+ ASSERT_EQ(10922, slot0_info.end);
+ ASSERT_EQ(2, slot0_info.nodes.size());
+ ASSERT_EQ(30002, slot0_info.nodes[0].port);
+ ASSERT_EQ("07c37dfeb235213a872192d90877d0cd55635b91", slot0_info.nodes[1].id);
+ SlotInfo slot1_info = slots_infos[1];
+ ASSERT_EQ(10923, slot1_info.start);
+ ASSERT_EQ(16383, slot1_info.end);
+ ASSERT_EQ(1, slot1_info.nodes.size());
+ ASSERT_EQ(30003, slot1_info.nodes[0].port);
+ ASSERT_EQ("17ed2db8d677e59ec4a4cefb06858cf2a1a89fa1", slot1_info.nodes[0].id);
+
+ unlink(nodes_filename.c_str());
+}
diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go
index dd690c27..51ee08b5 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -116,6 +116,58 @@ func TestClusterNodes(t *testing.T) {
})
}
+func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer srv1.Close()
+ ctx := context.Background()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ nodeID1 := "07c37dfeb235213a872192d90877d0cd55635b91"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", nodeID1).Err())
+
+ srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer srv2.Close()
+ rdb2 := srv2.NewClient()
+ defer func() { require.NoError(t, rdb2.Close()) }()
+ nodeID2 := "07c37dfeb235213a872192d90877d0cd55635b92"
+ require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODEID", nodeID2).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - ", nodeID1, srv1.Host(), srv1.Port())
+ clusterNodes += "0-1 2 4-8191 8192 8193 10000 10002-11002 16381 16382-16383\n"
+ clusterNodes += fmt.Sprintf("%s %s %d master -", nodeID2, srv2.Host(), srv2.Port())
+
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+ require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+ srv1.Restart()
+ slots := rdb1.ClusterSlots(ctx).Val()
+ require.Len(t, slots, 5)
+ require.EqualValues(t, 10000, slots[2].Start)
+ require.EqualValues(t, 10000, slots[2].End)
+ require.EqualValues(t, []redis.ClusterNode{{ID: nodeID1, Addr: srv1.HostPort()}}, slots[2].Nodes)
+ nodes := rdb1.ClusterNodes(ctx).Val()
+ require.Contains(t, nodes, "0-2 4-8193 10000 10002-11002 16381-16383")
+
+ newNodeID := "0123456789012345678901234567890123456789"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", newNodeID).Err())
+ srv1.Restart()
+ slots = rdb1.ClusterSlots(ctx).Val()
+ require.EqualValues(t, 10000, slots[2].Start)
+ require.EqualValues(t, 10000, slots[2].End)
+ nodes = rdb1.ClusterNodes(ctx).Val()
+ require.Contains(t, nodes, "0-2 4-8193 10000 10002-11002 16381-16383")
+
+ require.NoError(t, rdb2.Do(ctx, "clusterx", "setslot", "0", "node", nodeID2, "2").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "setslot", "0", "node", nodeID2, "2").Err())
+
+ srv1.Restart()
+ srv2.Restart()
+ nodes = rdb1.ClusterNodes(ctx).Val()
+ require.Contains(t, nodes, "1-2 4-8193 10000 10002-11002 16381-16383")
+ nodes = rdb2.ClusterNodes(ctx).Val()
+ require.Contains(t, nodes, "1-2 4-8193 10000 10002-11002 16381-16383")
+}
+
func TestClusterComplexTopology(t *testing.T) {
srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer srv.Close()