You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by "tisonkun (via GitHub)" <gi...@apache.org> on 2023/04/06 20:16:11 UTC

[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #1373: Minor refactoring of slot migration source code

tisonkun commented on code in PR #1373:
URL: https://github.com/apache/incubator-kvrocks/pull/1373#discussion_r1160214923


##########
src/cluster/slot_migrate.h:
##########
@@ -44,86 +45,75 @@
 #include "status.h"
 #include "storage/redis_db.h"
 
-constexpr const auto CLUSTER_SLOTS = HASH_SLOTS_SIZE;
+enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };
 
-enum MigrateTaskState { kMigrateNone = 0, kMigrateStarted, kMigrateSuccess, kMigrateFailed };
-
-enum MigrateStateMachine {
-  kSlotMigrateNone,
-  kSlotMigrateStart,
-  kSlotMigrateSnapshot,
-  kSlotMigrateWal,
-  kSlotMigrateSuccess,
-  kSlotMigrateFailed,
-  kSlotMigrateClean
-};
+enum class SlotMigrationStage { kNone, kStart, kSnapshot, kWAL, kSuccess, kFailed, kClean };
 
 enum class KeyMigrationResult { kMigrated, kExpired, kUnderlyingStructEmpty };
 
-struct SlotMigrateJob {
-  SlotMigrateJob(int slot, std::string dst_ip, int port, int speed, int pipeline_size, int seq_gap)
-      : migrate_slot_(slot),
+struct SlotMigrationJob {
+  SlotMigrationJob(int slot_id, std::string dst_ip, int dst_port, int speed, int pipeline_size, int seq_gap)
+      : slot_id_(static_cast<int16_t>(slot_id)),
         dst_ip_(std::move(dst_ip)),
-        dst_port_(port),
-        speed_limit_(speed),
-        pipeline_size_(pipeline_size),
-        seq_gap_(seq_gap) {}
-  SlotMigrateJob(const SlotMigrateJob &other) = delete;
-  SlotMigrateJob &operator=(const SlotMigrateJob &other) = delete;
-  ~SlotMigrateJob() { close(slot_fd_); }
-
-  int slot_fd_ = -1;  // fd to send data to dst during migrate job
-  int migrate_slot_;
+        dst_port_(dst_port),
+        max_speed_(speed),
+        max_pipeline_size_(pipeline_size),
+        seq_gap_limit_(seq_gap) {}
+  SlotMigrationJob(const SlotMigrationJob &other) = delete;
+  SlotMigrationJob &operator=(const SlotMigrationJob &other) = delete;
+  ~SlotMigrationJob() = default;
+
+  int16_t slot_id_;
   std::string dst_ip_;
   int dst_port_;
-  int speed_limit_;
-  int pipeline_size_;
-  int seq_gap_;
+  int max_speed_;
+  int max_pipeline_size_;
+  int seq_gap_limit_;
 };
 
-class SlotMigrate : public Redis::Database {
+class SlotMigrator : public Redis::Database {
  public:
-  explicit SlotMigrate(Server *svr, int migration_speed = kDefaultMigrationSpeed,
-                       int pipeline_size_limit = kDefaultPipelineSizeLimit, int seq_gap = kDefaultSeqGapLimit);
-  SlotMigrate(const SlotMigrate &other) = delete;
-  SlotMigrate &operator=(const SlotMigrate &other) = delete;
-  ~SlotMigrate();
-
-  Status CreateMigrateHandleThread();
-  void Loop();
-  Status MigrateStart(Server *svr, const std::string &node_id, const std::string &dst_ip, int dst_port, int slot,
-                      int speed, int pipeline_size, int seq_gap);
+  explicit SlotMigrator(Server *svr, int max_migration_speed = kDefaultMaxMigrationSpeed,
+                        int max_pipeline_size = kDefaultMaxPipelineSize, int seq_gap_limit = kDefaultSequenceGapLimit);
+  SlotMigrator(const SlotMigrator &other) = delete;
+  SlotMigrator &operator=(const SlotMigrator &other) = delete;
+  ~SlotMigrator();
+
+  Status CreateMigrationThread();
+  Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, int speed,
+                              int pipeline_size, int seq_gap);
   void ReleaseForbiddenSlot();
-  void SetMigrateSpeedLimit(int speed) {
-    if (speed >= 0) migration_speed_ = speed;
+  void SetMaxMigrationSpeed(int value) {
+    if (value >= 0) max_migration_speed_ = value;
   }
-  void SetPipelineSize(int value) {
-    if (value > 0) pipeline_size_limit_ = value;
+  void SetMaxPipelineSize(int value) {
+    if (value > 0) max_pipeline_size_ = value;
   }
-  void SetSequenceGapSize(int size) {
-    if (size > 0) seq_gap_limit_ = size;
+  void SetSequenceGapLimit(int value) {
+    if (value > 0) seq_gap_limit_ = value;
   }
-  void SetMigrateStopFlag(bool state) { stop_migrate_ = state; }
-  bool IsMigrationInProgress() const { return migrate_state_ == kMigrateStarted; }
-  int16_t GetMigrateStateMachine() const { return state_machine_; }
+  void SetStopMigrationFlag(bool value) { stop_migration_ = value; }
+  bool IsMigrationInProgress() const { return migration_state_ == MigrationState::kStarted; }
+  SlotMigrationStage GetCurrentSlotMigrationStage() const { return current_stage_; }
   int16_t GetForbiddenSlot() const { return forbidden_slot_; }
-  int16_t GetMigratingSlot() const { return migrate_slot_; }
-  void GetMigrateInfo(std::string *info) const;
-  bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
+  int16_t GetMigratingSlot() const { return migrating_slot_; }
+  void GetMigrationInfo(std::string *info) const;
 
  private:
-  void RunStateMachine();
-  Status Start();
+  void Loop();
+  void RunMigrationProcess();
+  bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
+  Status StartMigration();
   Status SendSnapshot();
-  Status SyncWal();
-  Status Success();
-  Status Fail();
+  Status SyncWAL();
+  Status FinishSuccessfulMigration();
+  Status FinishFailedMigration();

Review Comment:
   These two method name changes seem wordy. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org