You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by GitBox <gi...@apache.org> on 2022/12/19 17:04:06 UTC

[GitHub] [incubator-kvrocks] torwig opened a new pull request, #1197: Add support of stream migration during slot migration process

torwig opened a new pull request, #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197

   The `XSETID` command was implemented. This command is used only during stream key migration.
   It differs from the same `Redis` command just in one case: in `Redis`, you **can't** apply it to a non-existing stream; in `Kvrocks` you **can** because streams are allowed to be empty and there should be a way to migrate an empty stream (with no prior `XADD` command which can create a stream if it doesn't exist).
   Related C++ unit tests were added.
   
   Stream key migration was implemented. The key of the  `stream`  type is migrated via a separate function.
   Related Go tests were added.
   
   Refactor and tidy cluster-related code (`src/cluster` folder).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] PragmaTwice commented on pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
PragmaTwice commented on PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#issuecomment-1360931709

   Thanks all. Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
torwig commented on code in PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#discussion_r1052531392


##########
src/commands/redis_cmd.cc:
##########
@@ -6254,6 +6254,66 @@ class CommandXTrim : public Commander {
   StreamTrimStrategy strategy_ = StreamTrimStrategy::None;
 };
 
+class CommandXSetId : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    stream_name_ = args[1];
+
+    auto s = Redis::ParseStreamEntryID(args[2], &last_id_);
+    if (!s.IsOK()) {
+      return {Status::RedisParseErr, s.Msg()};
+    }
+
+    if (args.size() == 3) {
+      return Status::OK();
+    }
+
+    for (size_t i = 3; i < args.size(); /* manual increment */) {
+      if (Util::ToLower(args[i]) == "entriesadded" && i + 1 < args.size()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] PragmaTwice commented on a diff in pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
PragmaTwice commented on code in PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#discussion_r1054043299


##########
src/cluster/slot_migrate.h:
##########
@@ -77,89 +83,92 @@ struct SlotMigrateJob {
 
 class SlotMigrate : public Redis::Database {
  public:
-  explicit SlotMigrate(Server *svr, int speed = kMigrateSpeed, int pipeline_size = kPipelineSize,
-                       int seq_gap = kSeqGapLimit);
+  explicit SlotMigrate(Server *svr, int migration_speed = kDefaultMigrationSpeed,
+                       int pipeline_size_limit = kDefaultPipelineSizeLimit, int seq_gap = kDefaultSeqGapLimit);
+  SlotMigrate(const SlotMigrate &other) = delete;
+  SlotMigrate &operator=(const SlotMigrate &other) = delete;
   ~SlotMigrate();
 
-  Status CreateMigrateHandleThread(void);
+  Status CreateMigrateHandleThread();
   void Loop();
   Status MigrateStart(Server *svr, const std::string &node_id, const std::string &dst_ip, int dst_port, int slot,
                       int speed, int pipeline_size, int seq_gap);
   void ReleaseForbiddenSlot();
   void SetMigrateSpeedLimit(int speed) {
-    if (speed >= 0) migrate_speed_ = speed;
+    if (speed >= 0) migration_speed_ = speed;
   }
-  void SetPipelineSize(uint32_t size) {
-    if (size > 0) pipeline_size_limit_ = size;
+  void SetPipelineSize(int value) {
+    if (value > 0) pipeline_size_limit_ = value;
   }
   void SetSequenceGapSize(int size) {
     if (size > 0) seq_gap_limit_ = size;
   }
   void SetMigrateStopFlag(bool state) { stop_migrate_ = state; }
-  int16_t GetMigrateState() { return migrate_state_; }
-  int16_t GetMigrateStateMachine() { return state_machine_; }
-  int16_t GetForbiddenSlot(void) { return forbidden_slot_; }
-  int16_t GetMigratingSlot(void) { return migrate_slot_; }
-  void GetMigrateInfo(std::string *info);
+  bool IsMigrationInProgress() const { return migrate_state_ == kMigrateStarted; }
+  int16_t GetMigrateStateMachine() const { return state_machine_; }
+  int16_t GetForbiddenSlot() const { return forbidden_slot_; }
+  int16_t GetMigratingSlot() const { return migrate_slot_; }
+  void GetMigrateInfo(std::string *info) const;
   bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
 
  private:
-  void StateMachine(void);
-  Status Start(void);
-  Status SendSnapshot(void);
-  Status SyncWal(void);
-  Status Success(void);
-  Status Fail(void);
-  Status Clean(void);
+  void StateMachine();
+  Status Start();
+  Status SendSnapshot();
+  Status SyncWal();
+  Status Success();
+  Status Fail();
+  Status Clean();
 
   bool AuthDstServer(int sock_fd, const std::string &password);
   bool SetDstImportStatus(int sock_fd, int status);
   bool CheckResponseOnce(int sock_fd);
   bool CheckResponseWithCounts(int sock_fd, int total);
 
-  Status MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &value, std::string *restore_cmds);
+  KeyMigrationResult MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
+                                   std::string *restore_cmds);
   bool MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes,
                         std::string *restore_cmds);
   bool MigrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds);
+  bool MigrateStream(const rocksdb::Slice &key, const StreamMetadata &metadata, std::string *restore_cmds);

Review Comment:
   We can refactor them in further PRs : )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] caipengbo commented on a diff in pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
caipengbo commented on code in PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#discussion_r1053896272


##########
src/cluster/slot_migrate.h:
##########
@@ -77,89 +83,92 @@ struct SlotMigrateJob {
 
 class SlotMigrate : public Redis::Database {
  public:
-  explicit SlotMigrate(Server *svr, int speed = kMigrateSpeed, int pipeline_size = kPipelineSize,
-                       int seq_gap = kSeqGapLimit);
+  explicit SlotMigrate(Server *svr, int migration_speed = kDefaultMigrationSpeed,
+                       int pipeline_size_limit = kDefaultPipelineSizeLimit, int seq_gap = kDefaultSeqGapLimit);
+  SlotMigrate(const SlotMigrate &other) = delete;
+  SlotMigrate &operator=(const SlotMigrate &other) = delete;
   ~SlotMigrate();
 
-  Status CreateMigrateHandleThread(void);
+  Status CreateMigrateHandleThread();
   void Loop();
   Status MigrateStart(Server *svr, const std::string &node_id, const std::string &dst_ip, int dst_port, int slot,
                       int speed, int pipeline_size, int seq_gap);
   void ReleaseForbiddenSlot();
   void SetMigrateSpeedLimit(int speed) {
-    if (speed >= 0) migrate_speed_ = speed;
+    if (speed >= 0) migration_speed_ = speed;
   }
-  void SetPipelineSize(uint32_t size) {
-    if (size > 0) pipeline_size_limit_ = size;
+  void SetPipelineSize(int value) {
+    if (value > 0) pipeline_size_limit_ = value;
   }
   void SetSequenceGapSize(int size) {
     if (size > 0) seq_gap_limit_ = size;
   }
   void SetMigrateStopFlag(bool state) { stop_migrate_ = state; }
-  int16_t GetMigrateState() { return migrate_state_; }
-  int16_t GetMigrateStateMachine() { return state_machine_; }
-  int16_t GetForbiddenSlot(void) { return forbidden_slot_; }
-  int16_t GetMigratingSlot(void) { return migrate_slot_; }
-  void GetMigrateInfo(std::string *info);
+  bool IsMigrationInProgress() const { return migrate_state_ == kMigrateStarted; }
+  int16_t GetMigrateStateMachine() const { return state_machine_; }
+  int16_t GetForbiddenSlot() const { return forbidden_slot_; }
+  int16_t GetMigratingSlot() const { return migrate_slot_; }
+  void GetMigrateInfo(std::string *info) const;
   bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
 
  private:
-  void StateMachine(void);
-  Status Start(void);
-  Status SendSnapshot(void);
-  Status SyncWal(void);
-  Status Success(void);
-  Status Fail(void);
-  Status Clean(void);
+  void StateMachine();
+  Status Start();
+  Status SendSnapshot();
+  Status SyncWal();
+  Status Success();
+  Status Fail();
+  Status Clean();
 
   bool AuthDstServer(int sock_fd, const std::string &password);
   bool SetDstImportStatus(int sock_fd, int status);
   bool CheckResponseOnce(int sock_fd);
   bool CheckResponseWithCounts(int sock_fd, int total);
 
-  Status MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &value, std::string *restore_cmds);
+  KeyMigrationResult MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
+                                   std::string *restore_cmds);
   bool MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes,
                         std::string *restore_cmds);
   bool MigrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds);
+  bool MigrateStream(const rocksdb::Slice &key, const StreamMetadata &metadata, std::string *restore_cmds);

Review Comment:
   By the way, this type of function uses `bool` as its return value. I think it's a more elegant way to return `Status`, and move the LOG information from these internal functions to `Status` Msg, and return it to the upper level.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
torwig commented on code in PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#discussion_r1052477438


##########
src/cluster/slot_migrate.cc:
##########
@@ -554,28 +564,29 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
   }
 }
 
-Status SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &value, std::string *restore_cmds) {
+KeyMigrationResult SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
+                                              std::string *restore_cmds) {
   std::string prefix_key;
   AppendNamespacePrefix(key, &prefix_key);
-  std::string bytes = value.ToString();
+  std::string bytes = encoded_metadata.ToString();
   Metadata metadata(kRedisNone, false);
   metadata.Decode(bytes);
-  if (metadata.Type() != kRedisString && metadata.size == 0) {
+  if (metadata.Type() != kRedisString && metadata.Type() != kRedisStream && metadata.size == 0) {
     LOG(INFO) << "[migrate] No elements of key: " << prefix_key;
-    return Status(Status::cOK, "empty");

Review Comment:
   Wow, I didn't forget :)
   I used `KeyMigrationResult` to not rely on error messages because it can be error-prone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
torwig commented on code in PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#discussion_r1052475777


##########
src/cluster/slot_migrate.cc:
##########
@@ -554,28 +564,29 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
   }
 }
 
-Status SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &value, std::string *restore_cmds) {
+KeyMigrationResult SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
+                                              std::string *restore_cmds) {
   std::string prefix_key;
   AppendNamespacePrefix(key, &prefix_key);
-  std::string bytes = value.ToString();
+  std::string bytes = encoded_metadata.ToString();
   Metadata metadata(kRedisNone, false);
   metadata.Decode(bytes);
-  if (metadata.Type() != kRedisString && metadata.size == 0) {
+  if (metadata.Type() != kRedisString && metadata.Type() != kRedisStream && metadata.size == 0) {
     LOG(INFO) << "[migrate] No elements of key: " << prefix_key;
-    return Status(Status::cOK, "empty");

Review Comment:
   Yes, I introduced a dedicated enum type for that but forgot to use this very line. I'll fix it soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] PragmaTwice merged pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
PragmaTwice merged PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] PragmaTwice commented on a diff in pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
PragmaTwice commented on code in PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#discussion_r1052459337


##########
src/cluster/slot_migrate.cc:
##########
@@ -554,28 +564,29 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
   }
 }
 
-Status SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &value, std::string *restore_cmds) {
+KeyMigrationResult SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
+                                              std::string *restore_cmds) {
   std::string prefix_key;
   AppendNamespacePrefix(key, &prefix_key);
-  std::string bytes = value.ToString();
+  std::string bytes = encoded_metadata.ToString();
   Metadata metadata(kRedisNone, false);
   metadata.Decode(bytes);
-  if (metadata.Type() != kRedisString && metadata.size == 0) {
+  if (metadata.Type() != kRedisString && metadata.Type() != kRedisStream && metadata.size == 0) {
     LOG(INFO) << "[migrate] No elements of key: " << prefix_key;
-    return Status(Status::cOK, "empty");

Review Comment:
   The original code here is weird. We do not allow an OK status has a message, which will break the semantics of Status.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] PragmaTwice commented on a diff in pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
PragmaTwice commented on code in PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#discussion_r1052464960


##########
src/commands/redis_cmd.cc:
##########
@@ -6254,6 +6254,66 @@ class CommandXTrim : public Commander {
   StreamTrimStrategy strategy_ = StreamTrimStrategy::None;
 };
 
+class CommandXSetId : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    stream_name_ = args[1];
+
+    auto s = Redis::ParseStreamEntryID(args[2], &last_id_);
+    if (!s.IsOK()) {
+      return {Status::RedisParseErr, s.Msg()};
+    }
+
+    if (args.size() == 3) {
+      return Status::OK();
+    }
+
+    for (size_t i = 3; i < args.size(); /* manual increment */) {
+      if (Util::ToLower(args[i]) == "entriesadded" && i + 1 < args.size()) {

Review Comment:
   ```suggestion
         if (Util::EqualICase(args[i], "entriesadded") && i + 1 < args.size()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] caipengbo commented on a diff in pull request #1197: Add support of stream migration during slot migration process

Posted by GitBox <gi...@apache.org>.
caipengbo commented on code in PR #1197:
URL: https://github.com/apache/incubator-kvrocks/pull/1197#discussion_r1053896272


##########
src/cluster/slot_migrate.h:
##########
@@ -77,89 +83,92 @@ struct SlotMigrateJob {
 
 class SlotMigrate : public Redis::Database {
  public:
-  explicit SlotMigrate(Server *svr, int speed = kMigrateSpeed, int pipeline_size = kPipelineSize,
-                       int seq_gap = kSeqGapLimit);
+  explicit SlotMigrate(Server *svr, int migration_speed = kDefaultMigrationSpeed,
+                       int pipeline_size_limit = kDefaultPipelineSizeLimit, int seq_gap = kDefaultSeqGapLimit);
+  SlotMigrate(const SlotMigrate &other) = delete;
+  SlotMigrate &operator=(const SlotMigrate &other) = delete;
   ~SlotMigrate();
 
-  Status CreateMigrateHandleThread(void);
+  Status CreateMigrateHandleThread();
   void Loop();
   Status MigrateStart(Server *svr, const std::string &node_id, const std::string &dst_ip, int dst_port, int slot,
                       int speed, int pipeline_size, int seq_gap);
   void ReleaseForbiddenSlot();
   void SetMigrateSpeedLimit(int speed) {
-    if (speed >= 0) migrate_speed_ = speed;
+    if (speed >= 0) migration_speed_ = speed;
   }
-  void SetPipelineSize(uint32_t size) {
-    if (size > 0) pipeline_size_limit_ = size;
+  void SetPipelineSize(int value) {
+    if (value > 0) pipeline_size_limit_ = value;
   }
   void SetSequenceGapSize(int size) {
     if (size > 0) seq_gap_limit_ = size;
   }
   void SetMigrateStopFlag(bool state) { stop_migrate_ = state; }
-  int16_t GetMigrateState() { return migrate_state_; }
-  int16_t GetMigrateStateMachine() { return state_machine_; }
-  int16_t GetForbiddenSlot(void) { return forbidden_slot_; }
-  int16_t GetMigratingSlot(void) { return migrate_slot_; }
-  void GetMigrateInfo(std::string *info);
+  bool IsMigrationInProgress() const { return migrate_state_ == kMigrateStarted; }
+  int16_t GetMigrateStateMachine() const { return state_machine_; }
+  int16_t GetForbiddenSlot() const { return forbidden_slot_; }
+  int16_t GetMigratingSlot() const { return migrate_slot_; }
+  void GetMigrateInfo(std::string *info) const;
   bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
 
  private:
-  void StateMachine(void);
-  Status Start(void);
-  Status SendSnapshot(void);
-  Status SyncWal(void);
-  Status Success(void);
-  Status Fail(void);
-  Status Clean(void);
+  void StateMachine();
+  Status Start();
+  Status SendSnapshot();
+  Status SyncWal();
+  Status Success();
+  Status Fail();
+  Status Clean();
 
   bool AuthDstServer(int sock_fd, const std::string &password);
   bool SetDstImportStatus(int sock_fd, int status);
   bool CheckResponseOnce(int sock_fd);
   bool CheckResponseWithCounts(int sock_fd, int total);
 
-  Status MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &value, std::string *restore_cmds);
+  KeyMigrationResult MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
+                                   std::string *restore_cmds);
   bool MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes,
                         std::string *restore_cmds);
   bool MigrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds);
+  bool MigrateStream(const rocksdb::Slice &key, const StreamMetadata &metadata, std::string *restore_cmds);

Review Comment:
   By the way, this type of functions uses `bool` as its return value. I think it's a more elegant way to return `Status`, and move the LOG information from these internal functions to `Status` Msg, and return it to the upper level.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org