You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by "torwig (via GitHub)" <gi...@apache.org> on 2023/04/06 19:59:28 UTC

[GitHub] [incubator-kvrocks] torwig opened a new pull request, #1373: Minor refactoring of slot migration source code

torwig opened a new pull request, #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373

   Unused variables were removed.
   Some variables and methods were renamed.
   Destination socket descriptor wrapped by `UniqueFD` and moved outside `SlotMigrationJob` (now it contains only migration parameters ).
   Enum classes were used instead of plain enums.
   Move the logic of setting the forbidden slot into `SetForbiddenSlot` and call it explicitly between two calls of syncing from WAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] git-hulk merged pull request #1373: Minor refactoring of slot migration source code

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk merged PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #1373: Minor refactoring of slot migration source code

Posted by "tisonkun (via GitHub)" <gi...@apache.org>.
tisonkun commented on code in PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#discussion_r1160215837


##########
src/server/server.h:
##########
@@ -111,7 +111,7 @@ class ServerLogData {
 };
 
 class SlotImport;
-class SlotMigrate;
+class SlotMigrator;

Review Comment:
   If we just name `SlotImport` and `SlotMigrate`, let's keep it as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on pull request #1373: Minor refactoring of slot migration source code

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#issuecomment-1499593137

   @tisonkun You are absolutely right: it's hard to review because of so many changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] git-hulk commented on a diff in pull request #1373: Minor refactoring of slot migration source code

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on code in PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#discussion_r1160380011


##########
src/cluster/slot_migrate.h:
##########
@@ -133,52 +123,51 @@ class SlotMigrate : public Redis::Database {
   Status MigrateStream(const rocksdb::Slice &key, const StreamMetadata &metadata, std::string *restore_cmds);
   Status MigrateBitmapKey(const InternalKey &inkey, std::unique_ptr<rocksdb::Iterator> *iter,
                           std::vector<std::string> *user_cmd, std::string *restore_cmds);
+
   Status SendCmdsPipelineIfNeed(std::string *commands, bool need);
   void ApplyMigrationSpeedLimit() const;
   Status GenerateCmdsFromBatch(rocksdb::BatchResult *batch, std::string *commands);
   Status MigrateIncrementData(std::unique_ptr<rocksdb::TransactionLogIterator> *iter, uint64_t end_seq);
-  Status SyncWalBeforeForbidSlot();
-  Status SyncWalAfterForbidSlot();
+  Status SyncWalBeforeForbiddingSlot();

Review Comment:
   Should we also change `Wal` to `WAL` for keeping consistent with previous function `SyncWAL`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] git-hulk commented on pull request #1373: Minor refactoring of slot migration source code

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#issuecomment-1500294339

   Thanks all, merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #1373: Minor refactoring of slot migration source code

Posted by "tisonkun (via GitHub)" <gi...@apache.org>.
tisonkun commented on code in PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#discussion_r1160214923


##########
src/cluster/slot_migrate.h:
##########
@@ -44,86 +45,75 @@
 #include "status.h"
 #include "storage/redis_db.h"
 
-constexpr const auto CLUSTER_SLOTS = HASH_SLOTS_SIZE;
+enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };
 
-enum MigrateTaskState { kMigrateNone = 0, kMigrateStarted, kMigrateSuccess, kMigrateFailed };
-
-enum MigrateStateMachine {
-  kSlotMigrateNone,
-  kSlotMigrateStart,
-  kSlotMigrateSnapshot,
-  kSlotMigrateWal,
-  kSlotMigrateSuccess,
-  kSlotMigrateFailed,
-  kSlotMigrateClean
-};
+enum class SlotMigrationStage { kNone, kStart, kSnapshot, kWAL, kSuccess, kFailed, kClean };
 
 enum class KeyMigrationResult { kMigrated, kExpired, kUnderlyingStructEmpty };
 
-struct SlotMigrateJob {
-  SlotMigrateJob(int slot, std::string dst_ip, int port, int speed, int pipeline_size, int seq_gap)
-      : migrate_slot_(slot),
+struct SlotMigrationJob {
+  SlotMigrationJob(int slot_id, std::string dst_ip, int dst_port, int speed, int pipeline_size, int seq_gap)
+      : slot_id_(static_cast<int16_t>(slot_id)),
         dst_ip_(std::move(dst_ip)),
-        dst_port_(port),
-        speed_limit_(speed),
-        pipeline_size_(pipeline_size),
-        seq_gap_(seq_gap) {}
-  SlotMigrateJob(const SlotMigrateJob &other) = delete;
-  SlotMigrateJob &operator=(const SlotMigrateJob &other) = delete;
-  ~SlotMigrateJob() { close(slot_fd_); }
-
-  int slot_fd_ = -1;  // fd to send data to dst during migrate job
-  int migrate_slot_;
+        dst_port_(dst_port),
+        max_speed_(speed),
+        max_pipeline_size_(pipeline_size),
+        seq_gap_limit_(seq_gap) {}
+  SlotMigrationJob(const SlotMigrationJob &other) = delete;
+  SlotMigrationJob &operator=(const SlotMigrationJob &other) = delete;
+  ~SlotMigrationJob() = default;
+
+  int16_t slot_id_;
   std::string dst_ip_;
   int dst_port_;
-  int speed_limit_;
-  int pipeline_size_;
-  int seq_gap_;
+  int max_speed_;
+  int max_pipeline_size_;
+  int seq_gap_limit_;
 };
 
-class SlotMigrate : public Redis::Database {
+class SlotMigrator : public Redis::Database {
  public:
-  explicit SlotMigrate(Server *svr, int migration_speed = kDefaultMigrationSpeed,
-                       int pipeline_size_limit = kDefaultPipelineSizeLimit, int seq_gap = kDefaultSeqGapLimit);
-  SlotMigrate(const SlotMigrate &other) = delete;
-  SlotMigrate &operator=(const SlotMigrate &other) = delete;
-  ~SlotMigrate();
-
-  Status CreateMigrateHandleThread();
-  void Loop();
-  Status MigrateStart(Server *svr, const std::string &node_id, const std::string &dst_ip, int dst_port, int slot,
-                      int speed, int pipeline_size, int seq_gap);
+  explicit SlotMigrator(Server *svr, int max_migration_speed = kDefaultMaxMigrationSpeed,
+                        int max_pipeline_size = kDefaultMaxPipelineSize, int seq_gap_limit = kDefaultSequenceGapLimit);
+  SlotMigrator(const SlotMigrator &other) = delete;
+  SlotMigrator &operator=(const SlotMigrator &other) = delete;
+  ~SlotMigrator();
+
+  Status CreateMigrationThread();
+  Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, int speed,
+                              int pipeline_size, int seq_gap);
   void ReleaseForbiddenSlot();
-  void SetMigrateSpeedLimit(int speed) {
-    if (speed >= 0) migration_speed_ = speed;
+  void SetMaxMigrationSpeed(int value) {
+    if (value >= 0) max_migration_speed_ = value;
   }
-  void SetPipelineSize(int value) {
-    if (value > 0) pipeline_size_limit_ = value;
+  void SetMaxPipelineSize(int value) {
+    if (value > 0) max_pipeline_size_ = value;
   }
-  void SetSequenceGapSize(int size) {
-    if (size > 0) seq_gap_limit_ = size;
+  void SetSequenceGapLimit(int value) {
+    if (value > 0) seq_gap_limit_ = value;
   }
-  void SetMigrateStopFlag(bool state) { stop_migrate_ = state; }
-  bool IsMigrationInProgress() const { return migrate_state_ == kMigrateStarted; }
-  int16_t GetMigrateStateMachine() const { return state_machine_; }
+  void SetStopMigrationFlag(bool value) { stop_migration_ = value; }
+  bool IsMigrationInProgress() const { return migration_state_ == MigrationState::kStarted; }
+  SlotMigrationStage GetCurrentSlotMigrationStage() const { return current_stage_; }
   int16_t GetForbiddenSlot() const { return forbidden_slot_; }
-  int16_t GetMigratingSlot() const { return migrate_slot_; }
-  void GetMigrateInfo(std::string *info) const;
-  bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
+  int16_t GetMigratingSlot() const { return migrating_slot_; }
+  void GetMigrationInfo(std::string *info) const;
 
  private:
-  void RunStateMachine();
-  Status Start();
+  void Loop();
+  void RunMigrationProcess();
+  bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
+  Status StartMigration();
   Status SendSnapshot();
-  Status SyncWal();
-  Status Success();
-  Status Fail();
+  Status SyncWAL();
+  Status FinishSuccessfulMigration();
+  Status FinishFailedMigration();

Review Comment:
   These two method name changes seem wordy. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1373: Minor refactoring of slot migration source code

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#discussion_r1160492977


##########
src/cluster/slot_migrate.h:
##########
@@ -133,52 +123,51 @@ class SlotMigrate : public Redis::Database {
   Status MigrateStream(const rocksdb::Slice &key, const StreamMetadata &metadata, std::string *restore_cmds);
   Status MigrateBitmapKey(const InternalKey &inkey, std::unique_ptr<rocksdb::Iterator> *iter,
                           std::vector<std::string> *user_cmd, std::string *restore_cmds);
+
   Status SendCmdsPipelineIfNeed(std::string *commands, bool need);
   void ApplyMigrationSpeedLimit() const;
   Status GenerateCmdsFromBatch(rocksdb::BatchResult *batch, std::string *commands);
   Status MigrateIncrementData(std::unique_ptr<rocksdb::TransactionLogIterator> *iter, uint64_t end_seq);
-  Status SyncWalBeforeForbidSlot();
-  Status SyncWalAfterForbidSlot();
+  Status SyncWalBeforeForbiddingSlot();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on pull request #1373: Minor refactoring of slot migration source code

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#issuecomment-1500104538

   @PragmaTwice I'll handle `SlotImport` in the next PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1373: Minor refactoring of slot migration source code

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#discussion_r1160231245


##########
src/server/server.h:
##########
@@ -111,7 +111,7 @@ class ServerLogData {
 };
 
 class SlotImport;
-class SlotMigrate;
+class SlotMigrator;

Review Comment:
   I got your point. The reason is that I didn't touch `slot_import.h/.cc` in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1373: Minor refactoring of slot migration source code

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#discussion_r1160234056


##########
src/cluster/slot_migrate.h:
##########
@@ -44,86 +45,75 @@
 #include "status.h"
 #include "storage/redis_db.h"
 
-constexpr const auto CLUSTER_SLOTS = HASH_SLOTS_SIZE;
+enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };
 
-enum MigrateTaskState { kMigrateNone = 0, kMigrateStarted, kMigrateSuccess, kMigrateFailed };
-
-enum MigrateStateMachine {
-  kSlotMigrateNone,
-  kSlotMigrateStart,
-  kSlotMigrateSnapshot,
-  kSlotMigrateWal,
-  kSlotMigrateSuccess,
-  kSlotMigrateFailed,
-  kSlotMigrateClean
-};
+enum class SlotMigrationStage { kNone, kStart, kSnapshot, kWAL, kSuccess, kFailed, kClean };
 
 enum class KeyMigrationResult { kMigrated, kExpired, kUnderlyingStructEmpty };
 
-struct SlotMigrateJob {
-  SlotMigrateJob(int slot, std::string dst_ip, int port, int speed, int pipeline_size, int seq_gap)
-      : migrate_slot_(slot),
+struct SlotMigrationJob {
+  SlotMigrationJob(int slot_id, std::string dst_ip, int dst_port, int speed, int pipeline_size, int seq_gap)
+      : slot_id_(static_cast<int16_t>(slot_id)),
         dst_ip_(std::move(dst_ip)),
-        dst_port_(port),
-        speed_limit_(speed),
-        pipeline_size_(pipeline_size),
-        seq_gap_(seq_gap) {}
-  SlotMigrateJob(const SlotMigrateJob &other) = delete;
-  SlotMigrateJob &operator=(const SlotMigrateJob &other) = delete;
-  ~SlotMigrateJob() { close(slot_fd_); }
-
-  int slot_fd_ = -1;  // fd to send data to dst during migrate job
-  int migrate_slot_;
+        dst_port_(dst_port),
+        max_speed_(speed),
+        max_pipeline_size_(pipeline_size),
+        seq_gap_limit_(seq_gap) {}
+  SlotMigrationJob(const SlotMigrationJob &other) = delete;
+  SlotMigrationJob &operator=(const SlotMigrationJob &other) = delete;
+  ~SlotMigrationJob() = default;
+
+  int16_t slot_id_;
   std::string dst_ip_;
   int dst_port_;
-  int speed_limit_;
-  int pipeline_size_;
-  int seq_gap_;
+  int max_speed_;
+  int max_pipeline_size_;
+  int seq_gap_limit_;
 };
 
-class SlotMigrate : public Redis::Database {
+class SlotMigrator : public Redis::Database {
  public:
-  explicit SlotMigrate(Server *svr, int migration_speed = kDefaultMigrationSpeed,
-                       int pipeline_size_limit = kDefaultPipelineSizeLimit, int seq_gap = kDefaultSeqGapLimit);
-  SlotMigrate(const SlotMigrate &other) = delete;
-  SlotMigrate &operator=(const SlotMigrate &other) = delete;
-  ~SlotMigrate();
-
-  Status CreateMigrateHandleThread();
-  void Loop();
-  Status MigrateStart(Server *svr, const std::string &node_id, const std::string &dst_ip, int dst_port, int slot,
-                      int speed, int pipeline_size, int seq_gap);
+  explicit SlotMigrator(Server *svr, int max_migration_speed = kDefaultMaxMigrationSpeed,
+                        int max_pipeline_size = kDefaultMaxPipelineSize, int seq_gap_limit = kDefaultSequenceGapLimit);
+  SlotMigrator(const SlotMigrator &other) = delete;
+  SlotMigrator &operator=(const SlotMigrator &other) = delete;
+  ~SlotMigrator();
+
+  Status CreateMigrationThread();
+  Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, int speed,
+                              int pipeline_size, int seq_gap);
   void ReleaseForbiddenSlot();
-  void SetMigrateSpeedLimit(int speed) {
-    if (speed >= 0) migration_speed_ = speed;
+  void SetMaxMigrationSpeed(int value) {
+    if (value >= 0) max_migration_speed_ = value;
   }
-  void SetPipelineSize(int value) {
-    if (value > 0) pipeline_size_limit_ = value;
+  void SetMaxPipelineSize(int value) {
+    if (value > 0) max_pipeline_size_ = value;
   }
-  void SetSequenceGapSize(int size) {
-    if (size > 0) seq_gap_limit_ = size;
+  void SetSequenceGapLimit(int value) {
+    if (value > 0) seq_gap_limit_ = value;
   }
-  void SetMigrateStopFlag(bool state) { stop_migrate_ = state; }
-  bool IsMigrationInProgress() const { return migrate_state_ == kMigrateStarted; }
-  int16_t GetMigrateStateMachine() const { return state_machine_; }
+  void SetStopMigrationFlag(bool value) { stop_migration_ = value; }
+  bool IsMigrationInProgress() const { return migration_state_ == MigrationState::kStarted; }
+  SlotMigrationStage GetCurrentSlotMigrationStage() const { return current_stage_; }
   int16_t GetForbiddenSlot() const { return forbidden_slot_; }
-  int16_t GetMigratingSlot() const { return migrate_slot_; }
-  void GetMigrateInfo(std::string *info) const;
-  bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
+  int16_t GetMigratingSlot() const { return migrating_slot_; }
+  void GetMigrationInfo(std::string *info) const;
 
  private:
-  void RunStateMachine();
-  Status Start();
+  void Loop();
+  void RunMigrationProcess();
+  bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
+  Status StartMigration();
   Status SendSnapshot();
-  Status SyncWal();
-  Status Success();
-  Status Fail();
+  Status SyncWAL();
+  Status FinishSuccessfulMigration();
+  Status FinishFailedMigration();

Review Comment:
   I thought that new names clearly describe what they do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on pull request #1373: Minor refactoring of slot migration source code

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#issuecomment-1500080477

   @PragmaTwice Could you please have a look at my PR? As @tisonkun advised, I can split it into several distinct PRs to make the review more convenient. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org