You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2023/01/12 09:43:12 UTC

[incubator-kvrocks] branch unstable updated: Persist the cluster nodes info after applying the cluster topology (#1219)

This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new f1f7c10a Persist the cluster nodes info after applying the cluster topology (#1219)
f1f7c10a is described below

commit f1f7c10a30dc3387f6ffd77fe3c6efe2f393ed38
Author: hulk <hu...@gmail.com>
AuthorDate: Thu Jan 12 17:43:05 2023 +0800

    Persist the cluster nodes info after applying the cluster topology (#1219)
---
 src/cluster/cluster.cc                           | 130 ++++++++++++++++++++---
 src/cluster/cluster.h                            |   4 +
 src/commands/redis_cmd.cc                        |   7 ++
 src/config/config.cc                             |   2 +
 src/config/config.h                              |   1 +
 src/server/server.cc                             |   7 +-
 tests/cppunit/cluster_test.cc                    |  39 +++++++
 tests/gocase/integration/cluster/cluster_test.go |  52 +++++++++
 8 files changed, 227 insertions(+), 15 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index f89906bc..63c806a6 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -20,8 +20,11 @@
 
 #include "cluster.h"
 
+#include <config/config_util.h>
+
 #include <algorithm>
 #include <cstring>
+#include <fstream>
 #include <memory>
 
 #include "commands/redis_cmd.h"
@@ -487,8 +490,46 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) {
 }
 
 std::string Cluster::GenNodesDescription() {
-  // Generate slots info firstly
+  UpdateSlotsInfo();
+
+  auto now = Util::GetTimeStampMS();
+  std::string nodes_desc;
+  for (const auto &item : nodes_) {
+    const std::shared_ptr<ClusterNode> n = item.second;
+
+    std::string node_str;
+    // ID, host, port
+    node_str.append(n->id_ + " ");
+    node_str.append(fmt::format("{}:{}@{} ", n->host_, n->port_, n->port_ + kClusterPortIncr));
+
+    // Flags
+    if (n->id_ == myid_) node_str.append("myself,");
+    if (n->role_ == kClusterMaster) {
+      node_str.append("master - ");
+    } else {
+      node_str.append("slave " + n->master_id_ + " ");
+    }
+
+    // Ping sent, pong received, config epoch, link status
+    node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));
+
+    if (n->role_ == kClusterMaster && n->slots_info_.size() > 0) {
+      node_str.append(" " + n->slots_info_);
+    }
+
+    nodes_desc.append(node_str + "\n");
+  }
+  return nodes_desc;
+}
+
+void Cluster::UpdateSlotsInfo() {
   int start = -1;
+  // reset the previous slots info
+  for (const auto &item : nodes_) {
+    const std::shared_ptr<ClusterNode> &n = item.second;
+    n->slots_info_.clear();
+  }
+
   std::shared_ptr<ClusterNode> n = nullptr;
   for (int i = 0; i <= kClusterSlots; i++) {
     // Find start node and slot id
@@ -511,37 +552,98 @@ std::string Cluster::GenNodesDescription() {
     }
   }
 
-  std::string nodes_desc;
   for (const auto &item : nodes_) {
     const std::shared_ptr<ClusterNode> n = item.second;
+    if (n->slots_info_.size() > 0) n->slots_info_.pop_back();  // Remove last space
+  }
+}
+
+std::string Cluster::GenNodesInfo() {
+  UpdateSlotsInfo();
 
+  std::string nodes_info;
+  for (const auto &item : nodes_) {
+    const std::shared_ptr<ClusterNode> &n = item.second;
     std::string node_str;
-    // ID, host, port
+    node_str.append("node ");
+    // ID
     node_str.append(n->id_ + " ");
-    node_str.append(fmt::format("{}:{}@{} ", n->host_, n->port_, n->port_ + kClusterPortIncr));
+    // Host + Port
+    node_str.append(fmt::format("{} {} ", n->host_, n->port_));
 
-    // Flags
-    if (n->id_ == myid_) node_str.append("myself,");
+    // Role
     if (n->role_ == kClusterMaster) {
       node_str.append("master - ");
     } else {
       node_str.append("slave " + n->master_id_ + " ");
     }
 
-    // Ping sent, pong received, config epoch, link status
-    auto now = Util::GetTimeStampMS();
-    node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));
-
     // Slots
-    if (n->slots_info_.size() > 0) n->slots_info_.pop_back();  // Trim space
     if (n->role_ == kClusterMaster && n->slots_info_.size() > 0) {
       node_str.append(" " + n->slots_info_);
     }
-    n->slots_info_.clear();  // Reset
+    nodes_info.append(node_str + "\n");
+  }
+  return nodes_info;
+}
 
-    nodes_desc.append(node_str + "\n");
+Status Cluster::DumpClusterNodes(const std::string &file) {
+  // Parse and validate the cluster nodes string before dumping into file
+  std::string tmp_path = file + ".tmp";
+  remove(tmp_path.data());
+  std::ofstream output_file(tmp_path, std::ios::out);
+  output_file << fmt::format("version {}\n", version_);
+  output_file << fmt::format("id {}\n", myid_);
+  output_file << GenNodesInfo();
+  output_file.close();
+  if (rename(tmp_path.data(), file.data()) < 0) {
+    return {Status::NotOK, fmt::format("rename file encounter error: {}", strerror(errno))};
   }
-  return nodes_desc;
+  return Status::OK();
+}
+
+Status Cluster::LoadClusterNodes(const std::string &file_path) {
+  if (rocksdb::Env::Default()->FileExists(file_path).IsNotFound()) {
+    LOG(INFO) << fmt::format("The cluster nodes file {} is not found. Use CLUSTERX subcommands to specify it.",
+                             file_path);
+    return Status::OK();
+  }
+
+  std::ifstream file;
+  file.open(file_path);
+  if (!file.is_open()) {
+    return {Status::NotOK, fmt::format("error opening the file '{}': {}", file_path, strerror(errno))};
+  }
+
+  int64_t version = -1;
+  std::string id, nodesInfo;
+  std::string line;
+  while (!file.eof()) {
+    std::getline(file, line);
+
+    auto parsed = ParseConfigLine(line);
+    if (!parsed) return parsed.ToStatus().Prefixed("malformed line");
+    if (parsed->first.empty() || parsed->second.empty()) continue;
+
+    auto key = parsed->first;
+    if (key == "version") {
+      auto parse_result = ParseInt<int64_t>(parsed->second, 10);
+      if (!parse_result) {
+        return {Status::NotOK, errInvalidClusterVersion};
+      }
+      version = *parse_result;
+    } else if (key == "id") {
+      id = parsed->second;
+      if (id.length() != kClusterNodeIdLen) {
+        return {Status::NotOK, errInvalidNodeID};
+      }
+    } else if (key == "node") {
+      nodesInfo.append(parsed->second + "\n");
+    } else {
+      return {Status::NotOK, fmt::format("unknown key: {}", key)};
+    }
+  }
+  return SetClusterNodes(nodesInfo, version, false);
 }
 
 Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index d947d8c3..da551d19 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -94,11 +94,15 @@ class Cluster {
   Status MigrateSlot(int slot, const std::string &dst_node_id);
   Status ImportSlot(Redis::Connection *conn, int slot, int state);
   std::string GetMyId() const { return myid_; }
+  Status DumpClusterNodes(const std::string &file);
+  Status LoadClusterNodes(const std::string &file_path);
 
   static bool SubCommandIsExecExclusive(const std::string &subcommand);
 
  private:
   std::string GenNodesDescription();
+  std::string GenNodesInfo();
+  void UpdateSlotsInfo();
   SlotInfo GenSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n);
   Status ParseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
                            std::unordered_map<int, std::string> *slots_nodes);
diff --git a/src/commands/redis_cmd.cc b/src/commands/redis_cmd.cc
index 4f72c859..a66b13c9 100644
--- a/src/commands/redis_cmd.cc
+++ b/src/commands/redis_cmd.cc
@@ -5270,9 +5270,11 @@ class CommandClusterX : public Commander {
       return Status::OK();
     }
 
+    bool need_persist_nodes_info = false;
     if (subcommand_ == "setnodes") {
       Status s = svr->cluster_->SetClusterNodes(nodes_str_, set_version_, force_);
       if (s.IsOK()) {
+        need_persist_nodes_info = true;
         *output = Redis::SimpleString("OK");
       } else {
         *output = Redis::Error(s.Msg());
@@ -5280,6 +5282,7 @@ class CommandClusterX : public Commander {
     } else if (subcommand_ == "setnodeid") {
       Status s = svr->cluster_->SetNodeId(args_[2]);
       if (s.IsOK()) {
+        need_persist_nodes_info = true;
         *output = Redis::SimpleString("OK");
       } else {
         *output = Redis::Error(s.Msg());
@@ -5287,6 +5290,7 @@ class CommandClusterX : public Commander {
     } else if (subcommand_ == "setslot") {
       Status s = svr->cluster_->SetSlot(slot_id_, args_[4], set_version_);
       if (s.IsOK()) {
+        need_persist_nodes_info = true;
         *output = Redis::SimpleString("OK");
       } else {
         *output = Redis::Error(s.Msg());
@@ -5304,6 +5308,9 @@ class CommandClusterX : public Commander {
     } else {
       *output = Redis::Error("Invalid cluster command options");
     }
+    if (need_persist_nodes_info) {
+      return svr->cluster_->DumpClusterNodes(svr->GetConfig()->NodesFilePath());
+    }
     return Status::OK();
   }
 
diff --git a/src/config/config.cc b/src/config/config.cc
index e0cf0f8c..5b9dce00 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -592,6 +592,8 @@ void Config::initFieldCallback() {
   }
 }
 
+std::string Config::NodesFilePath() { return dir + "/nodes.conf"; }
+
 void Config::SetMaster(const std::string &host, uint32_t port) {
   master_host = host;
   master_port = port;
diff --git a/src/config/config.h b/src/config/config.h
index 2871f551..703b08aa 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -194,6 +194,7 @@ struct Config {
   mutable std::mutex backup_mu_;
 
  public:
+  std::string NodesFilePath();
   Status Rewrite();
   Status Load(const CLIOptions &path);
   void Get(const std::string &key, std::vector<std::string> *values);
diff --git a/src/server/server.cc b/src/server/server.cc
index bb56b19e..6c265451 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -137,12 +137,17 @@ Status Server::Start() {
   }
 
   if (config_->cluster_enabled) {
+    auto s = cluster_->LoadClusterNodes(config_->NodesFilePath());
+    if (!s.IsOK()) {
+      LOG(ERROR) << "Failed to load cluster nodes info: " << s.Msg();
+      return Status(Status::NotOK, s.Msg());
+    }
     // Create objects used for slot migration
     slot_migrate_ =
         std::make_unique<SlotMigrate>(this, config_->migrate_speed, config_->pipeline_size, config_->sequence_gap);
     slot_import_ = new SlotImport(this);
     // Create migrating thread
-    auto s = slot_migrate_->CreateMigrateHandleThread();
+    s = slot_migrate_->CreateMigrateHandleThread();
     if (!s.IsOK()) {
       LOG(ERROR) << "Failed to create migration thread, Err: " << s.Msg();
       return Status(Status::NotOK);
diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc
index b6e256d2..865122bf 100644
--- a/tests/cppunit/cluster_test.cc
+++ b/tests/cppunit/cluster_test.cc
@@ -156,3 +156,42 @@ TEST(Cluster, CluseterGetSlotInfo) {
   ASSERT_TRUE(info.nodes[0].port == 30002);
   ASSERT_TRUE(info.nodes[1].id == "07c37dfeb235213a872192d90877d0cd55635b91");
 }
+
+TEST(Cluster, TestDumpAndLoadClusterNodesInfo) {
+  int64_t version = 2;
+  const std::string nodes =
+      "07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
+      "slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1\n"
+      "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
+      "master - 5461-10922\n"
+      "17ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30003 master - 10923-16383";
+  Cluster cluster(nullptr, {"127.0.0.1"}, 30002);
+  Status s = cluster.SetClusterNodes(nodes, version, false);
+  ASSERT_TRUE(s.IsOK());
+
+  std::string nodes_filename = "nodes.conf";
+  s = cluster.DumpClusterNodes(nodes_filename);
+  ASSERT_TRUE(s.IsOK());
+  Cluster new_cluster(nullptr, {"127.0.0.1"}, 30002);
+  s = new_cluster.LoadClusterNodes(nodes_filename);
+  ASSERT_TRUE(s.IsOK());
+  ASSERT_EQ(version, new_cluster.GetVersion());
+  std::vector<SlotInfo> slots_infos;
+  s = new_cluster.GetSlotsInfo(&slots_infos);
+  ASSERT_TRUE(s.IsOK());
+  ASSERT_EQ(2, slots_infos.size());
+  SlotInfo slot0_info = slots_infos[0];
+  ASSERT_EQ(5461, slot0_info.start);
+  ASSERT_EQ(10922, slot0_info.end);
+  ASSERT_EQ(2, slot0_info.nodes.size());
+  ASSERT_EQ(30002, slot0_info.nodes[0].port);
+  ASSERT_EQ("07c37dfeb235213a872192d90877d0cd55635b91", slot0_info.nodes[1].id);
+  SlotInfo slot1_info = slots_infos[1];
+  ASSERT_EQ(10923, slot1_info.start);
+  ASSERT_EQ(16383, slot1_info.end);
+  ASSERT_EQ(1, slot1_info.nodes.size());
+  ASSERT_EQ(30003, slot1_info.nodes[0].port);
+  ASSERT_EQ("17ed2db8d677e59ec4a4cefb06858cf2a1a89fa1", slot1_info.nodes[0].id);
+
+  unlink(nodes_filename.c_str());
+}
diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go
index dd690c27..51ee08b5 100644
--- a/tests/gocase/integration/cluster/cluster_test.go
+++ b/tests/gocase/integration/cluster/cluster_test.go
@@ -116,6 +116,58 @@ func TestClusterNodes(t *testing.T) {
 	})
 }
 
+func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {
+	srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+	defer srv1.Close()
+	ctx := context.Background()
+	rdb1 := srv1.NewClient()
+	defer func() { require.NoError(t, rdb1.Close()) }()
+	nodeID1 := "07c37dfeb235213a872192d90877d0cd55635b91"
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", nodeID1).Err())
+
+	srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+	defer srv2.Close()
+	rdb2 := srv2.NewClient()
+	defer func() { require.NoError(t, rdb2.Close()) }()
+	nodeID2 := "07c37dfeb235213a872192d90877d0cd55635b92"
+	require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODEID", nodeID2).Err())
+
+	clusterNodes := fmt.Sprintf("%s %s %d master - ", nodeID1, srv1.Host(), srv1.Port())
+	clusterNodes += "0-1 2 4-8191 8192 8193 10000 10002-11002 16381 16382-16383\n"
+	clusterNodes += fmt.Sprintf("%s %s %d master -", nodeID2, srv2.Host(), srv2.Port())
+
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+	require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+	srv1.Restart()
+	slots := rdb1.ClusterSlots(ctx).Val()
+	require.Len(t, slots, 5)
+	require.EqualValues(t, 10000, slots[2].Start)
+	require.EqualValues(t, 10000, slots[2].End)
+	require.EqualValues(t, []redis.ClusterNode{{ID: nodeID1, Addr: srv1.HostPort()}}, slots[2].Nodes)
+	nodes := rdb1.ClusterNodes(ctx).Val()
+	require.Contains(t, nodes, "0-2 4-8193 10000 10002-11002 16381-16383")
+
+	newNodeID := "0123456789012345678901234567890123456789"
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", newNodeID).Err())
+	srv1.Restart()
+	slots = rdb1.ClusterSlots(ctx).Val()
+	require.EqualValues(t, 10000, slots[2].Start)
+	require.EqualValues(t, 10000, slots[2].End)
+	nodes = rdb1.ClusterNodes(ctx).Val()
+	require.Contains(t, nodes, "0-2 4-8193 10000 10002-11002 16381-16383")
+
+	require.NoError(t, rdb2.Do(ctx, "clusterx", "setslot", "0", "node", nodeID2, "2").Err())
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "setslot", "0", "node", nodeID2, "2").Err())
+
+	srv1.Restart()
+	srv2.Restart()
+	nodes = rdb1.ClusterNodes(ctx).Val()
+	require.Contains(t, nodes, "1-2 4-8193 10000 10002-11002 16381-16383")
+	nodes = rdb2.ClusterNodes(ctx).Val()
+	require.Contains(t, nodes, "1-2 4-8193 10000 10002-11002 16381-16383")
+}
+
 func TestClusterComplexTopology(t *testing.T) {
 	srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
 	defer srv.Close()