You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2022/10/11 05:12:04 UTC

[incubator-kvrocks] branch unstable updated: Fix server cannot exit properly when enabling cluster mode (#969)

This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new d33ef02  Fix server cannot exit properly when enabling cluster mode (#969)
d33ef02 is described below

commit d33ef02fbd76feebcdf6e1e16d0d11d8aee30da7
Author: hulk <hu...@gmail.com>
AuthorDate: Tue Oct 11 13:11:58 2022 +0800

    Fix server cannot exit properly when enabling cluster mode (#969)
---
 src/slot_migrate.cc         | 79 +++++++++++++++++++++++++++------------------
 src/slot_migrate.h          | 14 ++++++--
 tests/gocase/util/server.go |  4 +--
 3 files changed, 60 insertions(+), 37 deletions(-)

diff --git a/src/slot_migrate.cc b/src/slot_migrate.cc
index b2815a6..ed6fe58 100644
--- a/src/slot_migrate.cc
+++ b/src/slot_migrate.cc
@@ -46,11 +46,11 @@ SlotMigrate::SlotMigrate(Server *svr, int speed, int pipeline_size, int seq_gap)
   // because metadata_cf_handle_ and db_ will be destroyed if DB is reopened.
   // [Situation]:
   // 1. Start an empty slave server.
-  // 2. Connect to master which has amount of data, and trigger full sychronization.
+  // 2. Connect to master which has amounted of data, and trigger full synchronization.
   // 3. After replication, change slave to master and start slot migrate.
   // 4. It will occur segment fault when using metadata_cf_handle_ to create iterator of rocksdb.
   // [Reason]:
-  // After full sychronization, DB will be reopened, db_ and metadata_cf_handle_ will be released.
+  // After full synchronization, DB will be reopened, db_ and metadata_cf_handle_ will be released.
   // Then, if we create rocksdb iterator with metadata_cf_handle_, it will go wrong.
   // [Solution]:
   // db_ and metadata_cf_handle_ will be replaced by storage_->GetDB() and storage_->GetCFHandle("metadata")
@@ -75,7 +75,7 @@ SlotMigrate::SlotMigrate(Server *svr, int speed, int pipeline_size, int seq_gap)
   migrate_slot_ = -1;
   migrate_failed_slot_ = -1;
   migrate_state_ = kMigrateNone;
-  stop_ = false;
+  stop_migrate_ = false;
   slot_snapshot_ = nullptr;
 
   if (svr->IsSlave()) {
@@ -121,10 +121,20 @@ Status SlotMigrate::MigrateStart(Server *svr, const std::string &node_id, const
   return Status::OK();
 }
 
+SlotMigrate::~SlotMigrate() {
+  if (thread_state_ == ThreadState::Running) {
+    stop_migrate_ = true;
+    thread_state_ = ThreadState::Terminated;
+    job_cv_.notify_all();
+    t_.join();
+  }
+}
+
 Status SlotMigrate::CreateMigrateHandleThread(void) {
   try {
     t_ = std::thread([this]() {
       Util::ThreadSetName("slot-migrate");
+      thread_state_ = ThreadState::Running;
       this->Loop(static_cast<void*>(this));
     });
   } catch(const std::exception &e) {
@@ -136,11 +146,15 @@ Status SlotMigrate::CreateMigrateHandleThread(void) {
 void *SlotMigrate::Loop(void *arg) {
   while (true) {
     std::unique_lock<std::mutex> ul(this->job_mutex_);
-    while (this->slot_job_ == nullptr) {
+    while (!IsTerminated() && this->slot_job_ == nullptr) {
       this->job_cv_.wait(ul);
     }
     ul.unlock();
 
+    if (IsTerminated()) {
+      return nullptr;
+    }
+
     LOG(INFO) << "[migrate] migrate_slot: " << slot_job_->migrate_slot_
               << ", dst_ip: " << slot_job_->dst_ip_
               << ", dst_port: " << slot_job_->dst_port_
@@ -155,12 +169,16 @@ void *SlotMigrate::Loop(void *arg) {
 
     StateMachine();
   }
-  return nullptr;
 }
 
 void SlotMigrate::StateMachine(void) {
   state_machine_ = kSlotMigrateStart;
   while (true) {
+    if (IsTerminated()) {
+      LOG(WARNING) << "[migrate] Will stop state machine, because the thread was terminated";
+      return;
+    }
+
     switch (state_machine_) {
       case kSlotMigrateStart: {
         auto s = Start();
@@ -281,12 +299,12 @@ Status SlotMigrate::SendSnapshot(void) {
   ComposeSlotKeyPrefix(namespace_, slot, &prefix);
   LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;
 
-  // Seek to the begining of keys start with 'prefix' and iterate all these keys
+  // Seek to the beginning of keys start with 'prefix' and iterate all these keys
   for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
     // The migrating task has to be stopped, if server role is changed from master to slave
     // or flush command (flushdb or flushall) is executed
-    if (stop_) {
-      LOG(ERROR) << "[migrate] Stop migrating snapshot due to the thread stoped";
+    if (stop_migrate_) {
+      LOG(ERROR) << "[migrate] Stop migrating snapshot due to the thread stopped";
       return Status(Status::NotOK);
     }
 
@@ -346,7 +364,7 @@ Status SlotMigrate::SyncWal(void) {
 }
 
 Status SlotMigrate::Success(void) {
-  if (stop_) {
+  if (stop_migrate_) {
     LOG(ERROR) << "[migrate] Stop migrating slot " << migrate_slot_;
     return Status(Status::NotOK);
   }
@@ -370,7 +388,7 @@ Status SlotMigrate::Fail(void) {
   if (!SetDstImportStatus(slot_job_->slot_fd_, kImportFailed)) {
     LOG(INFO) << "[migrate] Failed to notify the destination that data migration failed";
   }
-  // Stop slot forbiding writing
+  // Stop slot will forbid writing
   migrate_failed_slot_ = migrate_slot_;
   forbidden_slot_ = -1;
   return Status::OK();
@@ -443,7 +461,7 @@ bool SlotMigrate::CheckResponseOnce(int sock_fd) {
 // ltrim        Redis::SimpleString    -Err\r\n
 // linsert      Redis::Integer
 // lset         Redis::SimpleString
-// hdel         Redis::Intege
+// hdel         Redis::Integer
 // srem         Redis::Integer
 // zrem         Redis::Integer
 // lpop         Redis::NilString       $-1\r\n
@@ -460,7 +478,7 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
     return false;
   }
 
-  // Set socket recieve timeout first
+  // Set socket receive timeout first
   struct timeval tv;
   tv.tv_sec = 1;
   tv.tv_usec = 0;
@@ -472,7 +490,7 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
   stat_ = ArrayLen;
   UniqueEvbuf evbuf;
   while (true) {
-    // Read response data from socket buffer to event buffer
+    // Read response data from socket buffer to the event buffer
     if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
       LOG(ERROR) << "[migrate] Failed to read response, Err: " + std::string(strerror(errno));
       return false;
@@ -541,7 +559,6 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
       }
     }
   }
-  return true;  // Can't reach here
 }
 
 Status SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &value, std::string *restore_cmds) {
@@ -621,9 +638,9 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m
   std::string slot_key, prefix_subkey;
   AppendNamespacePrefix(key, &slot_key);
   InternalKey(slot_key, "", metadata.version, true).Encode(&prefix_subkey);
-  int itermscount = 0;
+  int item_count = 0;
   for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
-    if (stop_) {
+    if (stop_migrate_) {
       LOG(ERROR) << "[migrate] Stop migrating complex key due to task stopped";
       return false;
     }
@@ -633,7 +650,7 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m
     }
 
     // Parse values of the complex key
-    // InternalKey is adopt to get compex key's value
+    // InternalKey is adopted to get complex key's value
     // from the formatted key return by iterator of rocksdb
     InternalKey inkey(iter->key(), true);
     switch (metadata.Type()) {
@@ -669,18 +686,18 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m
         break;
     }
 
-    // Check iterms count
+    // Check item count
     // Exclude bitmap because it does not have hmset-like command
     if (metadata.Type() != kRedisBitmap) {
-      itermscount++;
-      if (itermscount >= kMaxItemsInCommand) {
+      item_count++;
+      if (item_count >= kMaxItemsInCommand) {
         *restore_cmds += Redis::MultiBulkString(user_cmd, false);
         current_pipeline_size_++;
-        itermscount = 0;
-        // Have to clear saved iterms
+        item_count = 0;
+        // Have to clear saved items
         user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());
 
-        // Maybe key has amout of elements, have to check pipeline here
+        // Maybe key has amounted of elements, have to check pipeline here
         if (!SendCmdsPipelineIfNeed(restore_cmds, false)) {
           LOG(INFO) << "[migrate] Failed to send complex key part";
           return false;
@@ -689,8 +706,8 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m
     }
   }
 
-  // Have to check the iterm count of the last command list
-  if (itermscount % kMaxItemsInCommand) {
+  // Have to check the item count of the last command list
+  if (item_count % kMaxItemsInCommand) {
     *restore_cmds += Redis::MultiBulkString(user_cmd, false);
     current_pipeline_size_++;
   }
@@ -749,7 +766,7 @@ bool SlotMigrate::MigrateBitmapKey(const InternalKey &inkey,
 
 bool SlotMigrate::SendCmdsPipelineIfNeed(std::string *commands, bool need) {
   // Stop migrating or not
-  if (stop_) {
+  if (stop_migrate_) {
     LOG(ERROR) << "[migrate] Stop sending data due to migrating thread stopped"
                << ", current migrating slot: " << migrate_slot_;
     return false;
@@ -782,7 +799,7 @@ bool SlotMigrate::SendCmdsPipelineIfNeed(std::string *commands, bool need) {
     return false;
   }
 
-  // Clear commands and currentpipeline
+  // Clear commands and running pipeline
   commands->clear();
   current_pipeline_size_ = 0;
   return true;
@@ -843,21 +860,21 @@ Status SlotMigrate::MigrateIncrementData(std::unique_ptr<rocksdb::TransactionLog
   std::string commands;
   commands.clear();
   while (true) {
-    if (stop_) {
+    if (stop_migrate_) {
       LOG(ERROR) << "[migrate] Migration task end during migrating WAL data";
       return Status(Status::NotOK);
     }
     auto batch = (*iter)->GetBatch();
     if (batch.sequence != next_seq) {
       LOG(ERROR) << "[migrate] WAL iterator is discrete, some seq might be lost"
-                 << ", expectd sequence: " << next_seq << ", but got sequence: " << batch.sequence;
+                 << ", expected sequence: " << next_seq << ", but got sequence: " << batch.sequence;
       return Status(Status::NotOK);
     }
 
-    // Generate commands by iterating write bacth
+    // Generate commands by iterating write batch
     auto s = GenerateCmdsFromBatch(&batch, &commands);
     if (!s.IsOK()) {
-      LOG(ERROR) << "[migrate] Failed to generate commands from wirte batch";
+      LOG(ERROR) << "[migrate] Failed to generate commands from write batch";
       return Status(Status::NotOK);
     }
 
diff --git a/src/slot_migrate.h b/src/slot_migrate.h
index 05bc283..9ac6862 100644
--- a/src/slot_migrate.h
+++ b/src/slot_migrate.h
@@ -83,7 +83,7 @@ class SlotMigrate : public Redis::Database {
  public:
   explicit SlotMigrate(Server *svr, int speed = kMigrateSpeed,
                        int pipeline_size = kPipelineSize, int seq_gap = kSeqGapLimit);
-  ~SlotMigrate() {}
+  ~SlotMigrate();
 
   Status CreateMigrateHandleThread(void);
   void *Loop(void *arg);
@@ -93,12 +93,13 @@ class SlotMigrate : public Redis::Database {
   void SetMigrateSpeedLimit(int speed) { if (speed >= 0) migrate_speed_ = speed; }
   void SetPipelineSize(uint32_t size) { if (size > 0) pipeline_size_limit_ = size; }
   void SetSequenceGapSize(int size) { if (size > 0) seq_gap_limit_ = size; }
-  void SetMigrateStopFlag(bool state) { stop_ = state; }
+  void SetMigrateStopFlag(bool state) { stop_migrate_ = state; }
   int16_t GetMigrateState() { return migrate_state_; }
   int16_t GetMigrateStateMachine() { return state_machine_; }
   int16_t GetForbiddenSlot(void) { return forbidden_slot_; }
   int16_t GetMigratingSlot(void) { return migrate_slot_; }
   void GetMigrateInfo(std::string *info);
+  bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }
 
  private:
   void StateMachine(void);
@@ -145,6 +146,13 @@ class SlotMigrate : public Redis::Database {
   };
   ParserState stat_ = ArrayLen;
 
+  enum class ThreadState {
+    Uninitialized,
+    Running,
+    Terminated
+  };
+  ThreadState thread_state_ = ThreadState::Uninitialized;
+
   static const size_t kProtoInlineMaxSize = 16 * 1024L;
   static const size_t kProtoBulkMaxSize = 512 * 1024L * 1024L;
   static const int kMaxNotifyRetryTimes = 3;
@@ -169,7 +177,7 @@ class SlotMigrate : public Redis::Database {
   std::atomic<int16_t> migrate_slot_;
   int16_t migrate_failed_slot_;
   std::atomic<MigrateTaskState> migrate_state_;
-  std::atomic<bool> stop_;
+  std::atomic<bool> stop_migrate_;  // stop_migrate_ is true will stop migrate but the migration thread won't destroy.
   std::string current_migrate_key_;
   uint64_t slot_snapshot_time_;
   const rocksdb::Snapshot *slot_snapshot_;
diff --git a/tests/gocase/util/server.go b/tests/gocase/util/server.go
index 9aaf7f4..2f6239d 100644
--- a/tests/gocase/util/server.go
+++ b/tests/gocase/util/server.go
@@ -84,9 +84,7 @@ func (s *KvrocksServer) Close() {
 
 func (s *KvrocksServer) close(keepDir bool) {
 	require.NoError(s.t, s.cmd.Process.Signal(syscall.SIGTERM))
-	// TODO: activate require after server issue resolved
-	// https://github.com/apache/incubator-kvrocks/issues/946#issuecomment-1272445443
-	f := func(err error) { /* require.NoError(s.t, err) */ }
+	f := func(err error) { require.NoError(s.t, err) }
 	timer := time.AfterFunc(defaultGracePeriod, func() {
 		require.NoError(s.t, s.cmd.Process.Kill())
 		f = func(err error) { require.EqualError(s.t, err, "signal: killed") }