You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by "caipengbo (via GitHub)" <gi...@apache.org> on 2023/06/30 11:24:18 UTC

[GitHub] [kvrocks] caipengbo opened a new pull request, #1534: Slot migration based on raw key value

caipengbo opened a new pull request, #1534:
URL: https://github.com/apache/kvrocks/pull/1534

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1264646666


##########
src/cluster/slot_migrate.cc:
##########
@@ -1110,3 +1162,224 @@ void SlotMigrator::resumeSyncCtx(const Status &migrate_result) {
     blocking_context_ = nullptr;
   }
 }
+
+void SlotMigrator::SetMigrateBatchRateLimit(size_t bytes_per_sec) {
+  migrate_batch_bytes_per_sec_ =
+      (bytes_per_sec == 0 || bytes_per_sec > kMaxMigrateBatchRate) ? kMaxMigrateBatchRate : bytes_per_sec;
+  migrate_batch_rate_limiter_->SetBytesPerSecond(static_cast<int64_t>(migrate_batch_bytes_per_sec_));
+}
+
+MigrateType SlotMigrator::GetMigrateType() { return static_cast<MigrateType>(svr_->GetConfig()->migrate_type); }
+
+Status SlotMigrator::getClockSkew(int64_t *diff_us) {
+  assert(dst_redis_context_ != nullptr);
+  uint64_t send_timestamp = util::GetTimeStampUS();
+  auto *reply = static_cast<redisReply *>(redisCommand(dst_redis_context_, "TIME"));
+  uint64_t receive_timestamp = util::GetTimeStampUS();
+
+  auto exit = MakeScopeExit([reply] { freeReplyObject(reply); });
+
+  if (dst_redis_context_->err != 0) {
+    return {Status::NotOK, std::string(dst_redis_context_->errstr)};
+  }
+
+  if (reply == nullptr) {
+    return {Status::NotOK, "get null reply from TIME command"};
+  }
+
+  if (reply->type == REDIS_REPLY_ERROR) {
+    auto error_str = std::string(reply->str);
+    return {Status::NotOK, error_str};
+  }
+
+  if (reply->elements != 2) {
+    return {Status::NotOK, "get invalid reply from TIME command"};
+  }
+
+  uint64_t dst_timestamp = std::stoul(reply->element[0]->str) * 1000000 + std::stoul(reply->element[1]->str);

Review Comment:
   stoul will throws exceptions while the parsing fails, so i wonder if the parsing will always succeed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670545265

   > We certainly can't just arbitrarily import a library, so I've introduced the most official, and simplest library — hiredis.
   
   I know that hiredis is the official library, but it doesn't seem to answer our question.
   
   I would like to provide more specific explanations and understand the context of introducing hiredis:
   
   In the current codebase, such as in replication, when we need to construct a client connection, we directly build a raw TCP connection and then use our own redis protocol tools (such as redis::SimpleString and other protocol encoding/decoding functions).
   
   These processes are not well encapsulated and there are even some manual protocol constructions, such as `SockSend(..., "+OK\r\n")`, which is an existing issue.
   
   Of course, we can introduce a library to solve this problem. However, it is important to consider the aforementioned points when introducing a library:
   - in #1630, we add TLS support for client connection. how can we rewrite them in hiredis? (as we do not want to have multiple ways to construct client connection)
   - some code style problem as hiredis is a c library
   - how to replace the existing code (like, in replication) with hiredis
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on pull request #1534: Slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1631965406

   It seems some emory leaks are found by ASAN.
   
   https://github.com/apache/kvrocks/actions/runs/5527836049/jobs/10085132766?pr=1534


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] git-hulk commented on pull request #1534: Slot migration based on raw key value

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1631873557

   cool, thanks for your great efforts!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1264648460


##########
src/common/io_util.cc:
##########
@@ -404,4 +406,23 @@ Status Write(int fd, const std::string &data) { return WriteImpl<write>(fd, data
 
 Status Pwrite(int fd, const std::string &data, off_t offset) { return WriteImpl<pwrite>(fd, data, offset); }
 
+Status CreateRedisContextFromConnectedFd(int fd, int timeout, redisContext **redis_context) {
+  *redis_context = redisConnectFd(fd);
+  if (*redis_context == nullptr) {
+    return {Status::NotOK, "init failed"};
+  }
+
+  if ((*redis_context)->err != 0) {
+    auto error_str = std::string((*redis_context)->errstr);
+    redisFree(*redis_context);
+    return {Status::NotOK, error_str};
+  }
+
+  if (redisSetTimeout(*redis_context, timeval{timeout, 0}) != REDIS_OK) {
+    redisFree(*redis_context);
+    return {Status::NotOK, "set timeout failed"};
+  }
+  return Status::OK();
+}

Review Comment:
   ```suggestion
   using StaticRedisFree = StaticFunction<decltype(redisFree), redisFree>;
   
   struct UniqueRedisContext : std::unique_ptr<redisContext, StaticRedisFree> {
     using BaseType = std::unique_ptr<redisContext, StaticRedisFree>;
     using BaseType::BaseType;
   };
   
   StatusOr<UniqueRedisContext> CreateRedisContextFromConnectedFd(int fd, int timeout) {
     auto redis_context = UniqueRedisContext{redisConnectFd(fd)};
     if (!redis_context) {
       return {Status::NotOK, "init failed"};
     }
   
     if (redis_context->err != 0) {
       return {Status::NotOK, redis_context->errstr};
     }
   
     if (redisSetTimeout(redis_context.get(), timeval{timeout, 0}) != REDIS_OK) {
       return {Status::NotOK, "set timeout failed"};
     }
     return redis_context;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1264645638


##########
src/cluster/batch_sender.cc:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "batch_sender.h"
+
+#include "hiredis.h"
+#include "scope_exit.h"
+
+Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
+  if (pending_entries_ == 0 && !prefix_logdata_.empty()) {
+    // add prefix_logdata_ when the entry is first putted
+    auto s = PutLogData(prefix_logdata_);
+    if (!s.IsOK()) {
+      return s;
+    }
+  }
+  auto s = write_batch_.Put(cf, key, value);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put key value to migrate batch failed, {}", s.ToString())};
+  }
+
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) {
+  auto s = write_batch_.Delete(cf, key);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("delete key from migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::PutLogData(const rocksdb::Slice &blob) {
+  auto s = write_batch_.PutLogData(blob);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put log data to migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+void BatchSender::SetPrefixLogData(const std::string &prefix_logdata) { prefix_logdata_ = prefix_logdata; }
+
+Status BatchSender::Send() {
+  if (pending_entries_ == 0) {
+    return Status::OK();
+  }
+
+  auto s = sendBatchSetCmd(slot_, dst_redis_context_, write_batch_);
+  if (!s.IsOK()) {
+    return {Status::NotOK, fmt::format("BATCHSET command failed, {}", s.Msg())};
+  }
+
+  sent_bytes_ += write_batch_.GetDataSize();
+  sent_batches_num_++;
+  pending_entries_ = 0;
+  write_batch_.Clear();
+  return Status::OK();
+}
+
+Status BatchSender::sendBatchSetCmd(int16_t slot, redisContext *redis_context, const rocksdb::WriteBatch &write_batch) {
+  if (redis_context == nullptr) {
+    return {Status::NotOK, "redis context is null"};
+  }
+
+  auto *reply = static_cast<redisReply *>(
+      redisCommand(redis_context, "BATCHSET %d %b", slot, write_batch.Data().c_str(), write_batch.GetDataSize()));
+  auto exit = MakeScopeExit([reply] {
+    if (reply != nullptr) {
+      freeReplyObject(reply);
+    }
+  });
+
+  if (redis_context->err != 0) {
+    return {Status::NotOK, std::string(redis_context->errstr)};

Review Comment:
   ```suggestion
       return {Status::NotOK, redis_context->errstr};
   ```



##########
src/cluster/batch_sender.cc:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "batch_sender.h"
+
+#include "hiredis.h"
+#include "scope_exit.h"
+
+Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
+  if (pending_entries_ == 0 && !prefix_logdata_.empty()) {
+    // add prefix_logdata_ when the entry is first putted
+    auto s = PutLogData(prefix_logdata_);
+    if (!s.IsOK()) {
+      return s;
+    }
+  }
+  auto s = write_batch_.Put(cf, key, value);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put key value to migrate batch failed, {}", s.ToString())};
+  }
+
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) {
+  auto s = write_batch_.Delete(cf, key);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("delete key from migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::PutLogData(const rocksdb::Slice &blob) {
+  auto s = write_batch_.PutLogData(blob);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put log data to migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+void BatchSender::SetPrefixLogData(const std::string &prefix_logdata) { prefix_logdata_ = prefix_logdata; }
+
+Status BatchSender::Send() {
+  if (pending_entries_ == 0) {
+    return Status::OK();
+  }
+
+  auto s = sendBatchSetCmd(slot_, dst_redis_context_, write_batch_);
+  if (!s.IsOK()) {
+    return {Status::NotOK, fmt::format("BATCHSET command failed, {}", s.Msg())};
+  }
+
+  sent_bytes_ += write_batch_.GetDataSize();
+  sent_batches_num_++;
+  pending_entries_ = 0;
+  write_batch_.Clear();
+  return Status::OK();
+}
+
+Status BatchSender::sendBatchSetCmd(int16_t slot, redisContext *redis_context, const rocksdb::WriteBatch &write_batch) {
+  if (redis_context == nullptr) {
+    return {Status::NotOK, "redis context is null"};
+  }
+
+  auto *reply = static_cast<redisReply *>(
+      redisCommand(redis_context, "BATCHSET %d %b", slot, write_batch.Data().c_str(), write_batch.GetDataSize()));
+  auto exit = MakeScopeExit([reply] {
+    if (reply != nullptr) {
+      freeReplyObject(reply);
+    }

Review Comment:
   seems the check still exists here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1260635209


##########
src/cluster/batch_sender.cc:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "batch_sender.h"
+
+#include "hiredis.h"
+#include "scope_exit.h"
+
+Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
+  if (pending_entries_ == 0 && !prefix_logdata_.empty()) {
+    // add prefix_logdata_ when the entry is first putted
+    auto s = PutLogData(prefix_logdata_);
+    if (!s.IsOK()) {
+      return s;
+    }
+  }
+  auto s = write_batch_.Put(cf, key, value);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put key value to migrate batch failed, {}", s.ToString())};
+  }
+
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) {
+  auto s = write_batch_.Delete(cf, key);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("delete key from migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::PutLogData(const rocksdb::Slice &blob) {
+  auto s = write_batch_.PutLogData(blob);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put log data to migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+void BatchSender::SetPrefixLogData(const std::string &prefix_logdata) { prefix_logdata_ = prefix_logdata; }
+
+Status BatchSender::Send() {
+  if (pending_entries_ == 0) {
+    return Status::OK();
+  }
+
+  auto s = sendBatchSetCmd(slot_, dst_redis_context_, write_batch_);
+  if (!s.IsOK()) {
+    return {Status::NotOK, fmt::format("BATCHSET command failed, {}", s.Msg())};
+  }
+
+  sent_bytes_ += write_batch_.GetDataSize();
+  sent_batches_num_++;
+  pending_entries_ = 0;
+  write_batch_.Clear();
+  return Status::OK();
+}
+
+Status BatchSender::sendBatchSetCmd(int16_t slot, redisContext *redis_context, const rocksdb::WriteBatch &write_batch) {
+  if (redis_context == nullptr) {
+    return {Status::NotOK, "redis context is null"};
+  }
+
+  auto *reply = static_cast<redisReply *>(
+      redisCommand(redis_context, "BATCHSET %d %b", slot, write_batch.Data().c_str(), write_batch.GetDataSize()));
+  auto exit = MakeScopeExit([reply] {
+    if (reply != nullptr) {
+      freeReplyObject(reply);
+    }

Review Comment:
   nullptr check seems unnecessary here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670513493

   For introducing hiredis, here are something need to be clarified:
   - how is TLS support of hiredis? can we directly pass SSL_CTX and ssl_st to it?
   - we need a c++ wrapper. **kvrocks is not written in C**, and we need to follow modern C++ principles. raw C style is not weclomed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1260627867


##########
kvrocks.conf:
##########
@@ -501,7 +501,17 @@ compaction-checker-range 0-7
 #
 # rename-command KEYS ""
 
-################################ MIGRATE #####################################
+################################ MIGRATION #####################################
+# Slot migration supports two ways:
+# - redis_command: Migrate data by redis serialization protocol(RESP).
+# - raw_key_value: Migrate the raw key value data of the storage engine directly.

Review Comment:
   `redis-command` and `raw-key-value`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1264646666


##########
src/cluster/slot_migrate.cc:
##########
@@ -1110,3 +1162,224 @@ void SlotMigrator::resumeSyncCtx(const Status &migrate_result) {
     blocking_context_ = nullptr;
   }
 }
+
+void SlotMigrator::SetMigrateBatchRateLimit(size_t bytes_per_sec) {
+  migrate_batch_bytes_per_sec_ =
+      (bytes_per_sec == 0 || bytes_per_sec > kMaxMigrateBatchRate) ? kMaxMigrateBatchRate : bytes_per_sec;
+  migrate_batch_rate_limiter_->SetBytesPerSecond(static_cast<int64_t>(migrate_batch_bytes_per_sec_));
+}
+
+MigrateType SlotMigrator::GetMigrateType() { return static_cast<MigrateType>(svr_->GetConfig()->migrate_type); }
+
+Status SlotMigrator::getClockSkew(int64_t *diff_us) {
+  assert(dst_redis_context_ != nullptr);
+  uint64_t send_timestamp = util::GetTimeStampUS();
+  auto *reply = static_cast<redisReply *>(redisCommand(dst_redis_context_, "TIME"));
+  uint64_t receive_timestamp = util::GetTimeStampUS();
+
+  auto exit = MakeScopeExit([reply] { freeReplyObject(reply); });
+
+  if (dst_redis_context_->err != 0) {
+    return {Status::NotOK, std::string(dst_redis_context_->errstr)};
+  }
+
+  if (reply == nullptr) {
+    return {Status::NotOK, "get null reply from TIME command"};
+  }
+
+  if (reply->type == REDIS_REPLY_ERROR) {
+    auto error_str = std::string(reply->str);
+    return {Status::NotOK, error_str};
+  }
+
+  if (reply->elements != 2) {
+    return {Status::NotOK, "get invalid reply from TIME command"};
+  }
+
+  uint64_t dst_timestamp = std::stoul(reply->element[0]->str) * 1000000 + std::stoul(reply->element[1]->str);

Review Comment:
   stoul will throw exceptions while the parsing is failed, so i wonder if the parsing will always succeed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670522789

   `hiredis` isn't used much at the moment, so I've included a lightweight `hiredis`.
   
   There is no TLS requirement, we can simply wrapper `hiredis` now.
   
   In the future, if we need more, we can introduce some more complex clients, such as [redis-plus-plus](https://github.com/sewenew/redis-plus-plus), for now I think a lightweight library `hiredis` will do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670524271

   > There is no TLS requirement
   
   TLS support for replication is an ongoing effort, and we will support TLS for slot migration in the future. So we need to consider these problem now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670527218

   In the future, if there is a need, we can import other libraries and use this library on other existing code. For now, we can start with a simple library.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support rapid slot migration based on raw key value [kvrocks]

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk closed pull request #1534: Support rapid slot migration based on raw key value
URL: https://github.com/apache/kvrocks/pull/1534


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1260657849


##########
kvrocks.conf:
##########
@@ -501,7 +501,17 @@ compaction-checker-range 0-7
 #
 # rename-command KEYS ""
 
-################################ MIGRATE #####################################
+################################ MIGRATION #####################################
+# Slot migration supports two ways:
+# - redis_command: Migrate data by redis serialization protocol(RESP).
+# - raw_key_value: Migrate the raw key value data of the storage engine directly.

Review Comment:
   I mean dash instead of underlines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1264647731


##########
src/cluster/slot_migrate.cc:
##########
@@ -1110,3 +1162,224 @@ void SlotMigrator::resumeSyncCtx(const Status &migrate_result) {
     blocking_context_ = nullptr;
   }
 }
+
+void SlotMigrator::SetMigrateBatchRateLimit(size_t bytes_per_sec) {
+  migrate_batch_bytes_per_sec_ =
+      (bytes_per_sec == 0 || bytes_per_sec > kMaxMigrateBatchRate) ? kMaxMigrateBatchRate : bytes_per_sec;
+  migrate_batch_rate_limiter_->SetBytesPerSecond(static_cast<int64_t>(migrate_batch_bytes_per_sec_));
+}
+
+MigrateType SlotMigrator::GetMigrateType() { return static_cast<MigrateType>(svr_->GetConfig()->migrate_type); }
+
+Status SlotMigrator::getClockSkew(int64_t *diff_us) {
+  assert(dst_redis_context_ != nullptr);
+  uint64_t send_timestamp = util::GetTimeStampUS();
+  auto *reply = static_cast<redisReply *>(redisCommand(dst_redis_context_, "TIME"));
+  uint64_t receive_timestamp = util::GetTimeStampUS();
+
+  auto exit = MakeScopeExit([reply] { freeReplyObject(reply); });
+
+  if (dst_redis_context_->err != 0) {
+    return {Status::NotOK, std::string(dst_redis_context_->errstr)};
+  }
+
+  if (reply == nullptr) {
+    return {Status::NotOK, "get null reply from TIME command"};
+  }
+
+  if (reply->type == REDIS_REPLY_ERROR) {
+    auto error_str = std::string(reply->str);
+    return {Status::NotOK, error_str};
+  }
+
+  if (reply->elements != 2) {
+    return {Status::NotOK, "get invalid reply from TIME command"};
+  }
+
+  uint64_t dst_timestamp = std::stoul(reply->element[0]->str) * 1000000 + std::stoul(reply->element[1]->str);

Review Comment:
   Yes, normal `TIME` command will parse successfully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670534471

   We certainly can't just arbitrarily import a library, so I've included the most official, and simplest library — `hiredis`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] git-hulk commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670561945

   I'm fine to import the hiredis which can make the read/write operation easier, but for other libraries like redis-plus-plus, it needs the solid reason why we need it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1287821143


##########
src/cluster/batch_sender.cc:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "batch_sender.h"
+
+#include "hiredis.h"
+#include "scope_exit.h"
+
+Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
+  if (pending_entries_ == 0 && !prefix_logdata_.empty()) {

Review Comment:
   If the data is too large to fit in one `MigrateBatch`, it needs to be split into multiple `MigrateBatches`, and for each `MigrateBatch`, a `log data` header is appended.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1264645742


##########
src/cluster/batch_sender.cc:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "batch_sender.h"
+
+#include "hiredis.h"
+#include "scope_exit.h"
+
+Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
+  if (pending_entries_ == 0 && !prefix_logdata_.empty()) {
+    // add prefix_logdata_ when the entry is first putted
+    auto s = PutLogData(prefix_logdata_);
+    if (!s.IsOK()) {
+      return s;
+    }
+  }
+  auto s = write_batch_.Put(cf, key, value);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put key value to migrate batch failed, {}", s.ToString())};
+  }
+
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) {
+  auto s = write_batch_.Delete(cf, key);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("delete key from migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::PutLogData(const rocksdb::Slice &blob) {
+  auto s = write_batch_.PutLogData(blob);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put log data to migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+void BatchSender::SetPrefixLogData(const std::string &prefix_logdata) { prefix_logdata_ = prefix_logdata; }
+
+Status BatchSender::Send() {
+  if (pending_entries_ == 0) {
+    return Status::OK();
+  }
+
+  auto s = sendBatchSetCmd(slot_, dst_redis_context_, write_batch_);
+  if (!s.IsOK()) {
+    return {Status::NotOK, fmt::format("BATCHSET command failed, {}", s.Msg())};
+  }
+
+  sent_bytes_ += write_batch_.GetDataSize();
+  sent_batches_num_++;
+  pending_entries_ = 0;
+  write_batch_.Clear();
+  return Status::OK();
+}
+
+Status BatchSender::sendBatchSetCmd(int16_t slot, redisContext *redis_context, const rocksdb::WriteBatch &write_batch) {
+  if (redis_context == nullptr) {
+    return {Status::NotOK, "redis context is null"};
+  }
+
+  auto *reply = static_cast<redisReply *>(
+      redisCommand(redis_context, "BATCHSET %d %b", slot, write_batch.Data().c_str(), write_batch.GetDataSize()));
+  auto exit = MakeScopeExit([reply] {
+    if (reply != nullptr) {
+      freeReplyObject(reply);
+    }
+  });
+
+  if (redis_context->err != 0) {
+    return {Status::NotOK, std::string(redis_context->errstr)};
+  }
+
+  if (reply == nullptr) {
+    return {Status::NotOK, "get null reply"};
+  }
+
+  if (reply->type == REDIS_REPLY_ERROR) {
+    auto error_str = std::string(reply->str);
+    return {Status::NotOK, error_str};

Review Comment:
   actually i do not understand why construct a temp string here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1264648460


##########
src/common/io_util.cc:
##########
@@ -404,4 +406,23 @@ Status Write(int fd, const std::string &data) { return WriteImpl<write>(fd, data
 
 Status Pwrite(int fd, const std::string &data, off_t offset) { return WriteImpl<pwrite>(fd, data, offset); }
 
+Status CreateRedisContextFromConnectedFd(int fd, int timeout, redisContext **redis_context) {
+  *redis_context = redisConnectFd(fd);
+  if (*redis_context == nullptr) {
+    return {Status::NotOK, "init failed"};
+  }
+
+  if ((*redis_context)->err != 0) {
+    auto error_str = std::string((*redis_context)->errstr);
+    redisFree(*redis_context);
+    return {Status::NotOK, error_str};
+  }
+
+  if (redisSetTimeout(*redis_context, timeval{timeout, 0}) != REDIS_OK) {
+    redisFree(*redis_context);
+    return {Status::NotOK, "set timeout failed"};
+  }
+  return Status::OK();
+}

Review Comment:
   ```suggestion
   using StaticRedisFree = StaticFunction<decltype(redisFree), redisFree>;
   
   struct UniqueRedisContext : std::unique_ptr<redisContext, StaticRedisFree> {
     using BaseType = std::unique_ptr<redisContext, StaticRedisFree>;
     using BaseType::BaseType;
   };
   
   StatusOr<UniqueRedisContext> CreateRedisContextFromConnectedFd(int fd, int timeout) {
     auto redis_context = UniqueRedisContext{redisConnectFd(fd)};
     if (!redis_context) {
       return {Status::NotOK, "init failed"};
     }
   
     if ((*redis_context)->err != 0) {
       return {Status::NotOK, redis_context->errstr};
     }
   
     if (redisSetTimeout(redis_context.get(), timeval{timeout, 0}) != REDIS_OK) {
       return {Status::NotOK, "set timeout failed"};
     }
     return redis_context;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670553407

   I understand what you mean, you want to use a client library to do all the things you mentioned above.
   
   I introduced the context of `hiredis` just to make it easier to send and receive responses and I've also privately written a multi-threaded version of the kvrocks migration tool that also uses `hiredis`.
   
   These are the things you mention that I probably don't have the answers to yet, and I don't have the answers yet to figure out how to do them, so introducing a simple, easily replaceable library is the best option for now. If we come up with a better way to handle the things you mentioned above, we can easily replace `hiredis`(reuse is better).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670531933

   > In the future, if there is a need, we can import other libraries and use this library on other existing code.
   
   We cannot introduce libraries so casually.
   
   When introducing a library, we need to make sure that it can indeed solve the requirements/simplify the code and is easy to maintain. We will not randomly introduce a library in the early stages and then introduce other similar libraries later when problems arise: this is not something a well-managed project should do, as it would make the code worse by having multiple libraries doing the same thing. We need to carefully consider before introducing any library.
   
   I am not against introducing hiredis, but I believe that these issues should be considered clearly when introducing it. **Instead of "randomly introducing a simple library and considering what comes next later."**
   
   [hiredis seems to support TLS](https://github.com/redis/hiredis/blob/master/hiredis_ssl.h), but I'm not sure if it can reuse our SSL_CTX instead of reconstructing it.
   
   cc @git-hulk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1287822730


##########
src/cluster/migrate_iterator.cc:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "cluster/migrate_iterator.h"
+
+#include "db_util.h"
+#include "storage/redis_db.h"
+
+MigrateIterator::MigrateIterator(engine::Storage *storage, const rocksdb::ReadOptions &read_options)
+    : metadata_cf_(storage->GetCFHandle(engine::kMetadataColumnFamilyName)),
+      subkey_cf_(storage->GetCFHandle(engine::kSubkeyColumnFamilyName)),
+      zset_score_cf_(storage->GetCFHandle(engine::kZSetScoreColumnFamilyName)),
+      stream_cf_(storage->GetCFHandle(engine::kStreamColumnFamilyName)),
+      metadata_iter_(util::UniqueIterator(storage, read_options, metadata_cf_)),
+      subdata_iter_(util::UniqueIterator(storage, read_options, subkey_cf_)),
+      stream_iter_(util::UniqueIterator(storage, read_options, stream_cf_)),
+      valid_(false),
+      metadata_(RedisType::kRedisNone, false) {}
+
+bool MigrateIterator::Valid() const { return valid_; }
+
+void MigrateIterator::Seek(const rocksdb::Slice &target) {
+  items_.clear();
+  log_data_.clear();
+  metadata_iter_->Reset();
+  subdata_iter_->Reset();
+  stream_iter_->Reset();
+  metakey_prefix_.clear();
+
+  metadata_iter_->Seek(target);
+  valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(target);
+  if (valid_) {
+    metakey_prefix_ = target.ToString();
+    findMetaData();
+  }
+}
+
+void MigrateIterator::Next() {
+  assert(valid_);
+  valid_ = false;
+  items_.clear();
+
+  if (subdata_iter_->Valid()) {
+    subdata_iter_->Next();
+    valid_ = subdata_iter_->Valid() && subdata_iter_->key().starts_with(subkey_prefix_);
+    if (!valid_) {
+      subdata_iter_->Reset();
+    }
+  } else if (stream_iter_->Valid()) {
+    stream_iter_->Next();
+    valid_ = stream_iter_->Valid() && stream_iter_->key().starts_with(subkey_prefix_);
+    if (!valid_) {
+      stream_iter_->Reset();
+    }
+  }
+
+  if (valid_) {
+    findSubData();
+  } else if (metadata_iter_->Valid()) {
+    metadata_iter_->Next();
+    valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(metakey_prefix_);
+    if (valid_) {
+      findMetaData();
+    }
+  }
+}
+
+const std::vector<MigrateItem> &MigrateIterator::GetItems() const {
+  assert(valid_);
+  return items_;
+}
+
+std::string MigrateIterator::GetLogData() const {
+  assert(valid_);
+  return log_data_;
+}
+
+void MigrateIterator::findMetaData() {
+  assert(metadata_iter_->Valid());
+  Metadata metadata(kRedisNone, false /* generate_version */);
+  metadata.Decode(metadata_iter_->value().ToString());
+  RedisType redis_type = metadata.Type();
+  metadata_ = metadata;
+
+  redis::WriteBatchLogData log_data(redis_type);
+  if (redis_type == RedisType::kRedisList) {
+    log_data.SetArguments({std::to_string(RedisCommand::kRedisCmdRPush)});
+  }
+
+  log_data_ = log_data.Encode();
+
+  items_.push_back(MigrateItem{metadata_cf_, metadata_iter_->key().ToString(), metadata_iter_->value().ToString()});
+
+  if (redis_type == RedisType::kRedisStream) {

Review Comment:
   Yes, it looks fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1287821143


##########
src/cluster/batch_sender.cc:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "batch_sender.h"
+
+#include "hiredis.h"
+#include "scope_exit.h"
+
+Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
+  if (pending_entries_ == 0 && !prefix_logdata_.empty()) {

Review Comment:
   If the data is too large to fit in one `MigrateBatch`, it needs to be split into multiple `MigrateBatches`, for each `MigrateBatch`, it automatically appends the log data
   
   When adding metadata, we actively call `BatchSender::PutLogData` to add log data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1637024232

   > I recommend you to add unique_ptr wrappers for redisContext and redisReply pointers. You can refer to [event_util.h](https://github.com/apache/kvrocks/blob/unstable/src/common/event_util.h).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1264645698


##########
src/cluster/batch_sender.cc:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "batch_sender.h"
+
+#include "hiredis.h"
+#include "scope_exit.h"
+
+Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
+  if (pending_entries_ == 0 && !prefix_logdata_.empty()) {
+    // add prefix_logdata_ when the entry is first putted
+    auto s = PutLogData(prefix_logdata_);
+    if (!s.IsOK()) {
+      return s;
+    }
+  }
+  auto s = write_batch_.Put(cf, key, value);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put key value to migrate batch failed, {}", s.ToString())};
+  }
+
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) {
+  auto s = write_batch_.Delete(cf, key);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("delete key from migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+Status BatchSender::PutLogData(const rocksdb::Slice &blob) {
+  auto s = write_batch_.PutLogData(blob);
+  if (!s.ok()) {
+    return {Status::NotOK, fmt::format("put log data to migrate batch failed, {}", s.ToString())};
+  }
+  pending_entries_++;
+  entries_num_++;
+  return Status::OK();
+}
+
+void BatchSender::SetPrefixLogData(const std::string &prefix_logdata) { prefix_logdata_ = prefix_logdata; }
+
+Status BatchSender::Send() {
+  if (pending_entries_ == 0) {
+    return Status::OK();
+  }
+
+  auto s = sendBatchSetCmd(slot_, dst_redis_context_, write_batch_);
+  if (!s.IsOK()) {
+    return {Status::NotOK, fmt::format("BATCHSET command failed, {}", s.Msg())};
+  }
+
+  sent_bytes_ += write_batch_.GetDataSize();
+  sent_batches_num_++;
+  pending_entries_ = 0;
+  write_batch_.Clear();
+  return Status::OK();
+}
+
+Status BatchSender::sendBatchSetCmd(int16_t slot, redisContext *redis_context, const rocksdb::WriteBatch &write_batch) {
+  if (redis_context == nullptr) {
+    return {Status::NotOK, "redis context is null"};
+  }
+
+  auto *reply = static_cast<redisReply *>(
+      redisCommand(redis_context, "BATCHSET %d %b", slot, write_batch.Data().c_str(), write_batch.GetDataSize()));
+  auto exit = MakeScopeExit([reply] {
+    if (reply != nullptr) {
+      freeReplyObject(reply);
+    }
+  });
+
+  if (redis_context->err != 0) {
+    return {Status::NotOK, std::string(redis_context->errstr)};
+  }
+
+  if (reply == nullptr) {
+    return {Status::NotOK, "get null reply"};
+  }
+
+  if (reply->type == REDIS_REPLY_ERROR) {
+    auto error_str = std::string(reply->str);
+    return {Status::NotOK, error_str};

Review Comment:
   ```suggestion
       return {Status::NotOK, reply->str};
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] caipengbo commented on a diff in pull request #1534: Slot migration based on raw key value

Posted by "caipengbo (via GitHub)" <gi...@apache.org>.
caipengbo commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1260638782


##########
kvrocks.conf:
##########
@@ -501,7 +501,17 @@ compaction-checker-range 0-7
 #
 # rename-command KEYS ""
 
-################################ MIGRATE #####################################
+################################ MIGRATION #####################################
+# Slot migration supports two ways:
+# - redis_command: Migrate data by redis serialization protocol(RESP).
+# - raw_key_value: Migrate the raw key value data of the storage engine directly.

Review Comment:
   I want to distinguish the item value from the item name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1534: Slot migration based on raw key value

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1260633129


##########
src/cluster/slot_migrate.h:
##########
@@ -169,6 +195,7 @@ class SlotMigrator : public redis::Database {
   std::string dst_ip_;
   int dst_port_ = -1;
   UniqueFD dst_fd_;
+  redisContext *dst_redis_context_ = nullptr;

Review Comment:
   It seems the redisFree call for deallocating dst_redis_context_ is missing? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] git-hulk commented on pull request #1534: Support rapid slot migration based on raw key value

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#issuecomment-1670541140

   Yes, need to think through before introducing the new library. Kvrocks's dependencies are more and more, we need to take care of this situation to avoid being out of control.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] git-hulk commented on a diff in pull request #1534: Support rapid slot migration based on raw key value

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on code in PR #1534:
URL: https://github.com/apache/kvrocks/pull/1534#discussion_r1286023076


##########
src/cluster/batch_sender.cc:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "batch_sender.h"
+
+#include "hiredis.h"
+#include "scope_exit.h"
+
+Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
+  if (pending_entries_ == 0 && !prefix_logdata_.empty()) {

Review Comment:
   Why only put the log data for the first entry?



##########
src/cluster/migrate_iterator.cc:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "cluster/migrate_iterator.h"
+
+#include "db_util.h"
+#include "storage/redis_db.h"
+
+MigrateIterator::MigrateIterator(engine::Storage *storage, const rocksdb::ReadOptions &read_options)
+    : metadata_cf_(storage->GetCFHandle(engine::kMetadataColumnFamilyName)),
+      subkey_cf_(storage->GetCFHandle(engine::kSubkeyColumnFamilyName)),
+      zset_score_cf_(storage->GetCFHandle(engine::kZSetScoreColumnFamilyName)),
+      stream_cf_(storage->GetCFHandle(engine::kStreamColumnFamilyName)),
+      metadata_iter_(util::UniqueIterator(storage, read_options, metadata_cf_)),
+      subdata_iter_(util::UniqueIterator(storage, read_options, subkey_cf_)),
+      stream_iter_(util::UniqueIterator(storage, read_options, stream_cf_)),
+      valid_(false),
+      metadata_(RedisType::kRedisNone, false) {}
+
+bool MigrateIterator::Valid() const { return valid_; }
+
+void MigrateIterator::Seek(const rocksdb::Slice &target) {
+  items_.clear();
+  log_data_.clear();
+  metadata_iter_->Reset();
+  subdata_iter_->Reset();
+  stream_iter_->Reset();
+  metakey_prefix_.clear();
+
+  metadata_iter_->Seek(target);
+  valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(target);
+  if (valid_) {
+    metakey_prefix_ = target.ToString();
+    findMetaData();
+  }
+}
+
+void MigrateIterator::Next() {
+  assert(valid_);
+  valid_ = false;
+  items_.clear();
+
+  if (subdata_iter_->Valid()) {
+    subdata_iter_->Next();
+    valid_ = subdata_iter_->Valid() && subdata_iter_->key().starts_with(subkey_prefix_);
+    if (!valid_) {
+      subdata_iter_->Reset();
+    }
+  } else if (stream_iter_->Valid()) {
+    stream_iter_->Next();
+    valid_ = stream_iter_->Valid() && stream_iter_->key().starts_with(subkey_prefix_);
+    if (!valid_) {
+      stream_iter_->Reset();
+    }
+  }
+
+  if (valid_) {
+    findSubData();
+  } else if (metadata_iter_->Valid()) {
+    metadata_iter_->Next();
+    valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(metakey_prefix_);
+    if (valid_) {
+      findMetaData();
+    }
+  }
+}
+
+const std::vector<MigrateItem> &MigrateIterator::GetItems() const {
+  assert(valid_);
+  return items_;
+}
+
+std::string MigrateIterator::GetLogData() const {
+  assert(valid_);
+  return log_data_;
+}
+
+void MigrateIterator::findMetaData() {
+  assert(metadata_iter_->Valid());
+  Metadata metadata(kRedisNone, false /* generate_version */);
+  metadata.Decode(metadata_iter_->value().ToString());
+  RedisType redis_type = metadata.Type();
+  metadata_ = metadata;
+
+  redis::WriteBatchLogData log_data(redis_type);
+  if (redis_type == RedisType::kRedisList) {
+    log_data.SetArguments({std::to_string(RedisCommand::kRedisCmdRPush)});
+  }
+
+  log_data_ = log_data.Encode();
+
+  items_.push_back(MigrateItem{metadata_cf_, metadata_iter_->key().ToString(), metadata_iter_->value().ToString()});
+
+  if (redis_type == RedisType::kRedisStream) {

Review Comment:
   If we can move this reset inside the initSubData?



##########
src/cluster/migrate_iterator.cc:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "cluster/migrate_iterator.h"
+
+#include "db_util.h"
+#include "storage/redis_db.h"
+
+MigrateIterator::MigrateIterator(engine::Storage *storage, const rocksdb::ReadOptions &read_options)
+    : metadata_cf_(storage->GetCFHandle(engine::kMetadataColumnFamilyName)),
+      subkey_cf_(storage->GetCFHandle(engine::kSubkeyColumnFamilyName)),
+      zset_score_cf_(storage->GetCFHandle(engine::kZSetScoreColumnFamilyName)),
+      stream_cf_(storage->GetCFHandle(engine::kStreamColumnFamilyName)),
+      metadata_iter_(util::UniqueIterator(storage, read_options, metadata_cf_)),
+      subdata_iter_(util::UniqueIterator(storage, read_options, subkey_cf_)),
+      stream_iter_(util::UniqueIterator(storage, read_options, stream_cf_)),
+      valid_(false),
+      metadata_(RedisType::kRedisNone, false) {}
+
+bool MigrateIterator::Valid() const { return valid_; }
+
+void MigrateIterator::Seek(const rocksdb::Slice &target) {
+  items_.clear();
+  log_data_.clear();
+  metadata_iter_->Reset();
+  subdata_iter_->Reset();
+  stream_iter_->Reset();
+  metakey_prefix_.clear();
+
+  metadata_iter_->Seek(target);
+  valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(target);
+  if (valid_) {
+    metakey_prefix_ = target.ToString();
+    findMetaData();
+  }
+}
+
+void MigrateIterator::Next() {
+  assert(valid_);
+  valid_ = false;
+  items_.clear();
+
+  if (subdata_iter_->Valid()) {
+    subdata_iter_->Next();
+    valid_ = subdata_iter_->Valid() && subdata_iter_->key().starts_with(subkey_prefix_);
+    if (!valid_) {
+      subdata_iter_->Reset();
+    }
+  } else if (stream_iter_->Valid()) {
+    stream_iter_->Next();
+    valid_ = stream_iter_->Valid() && stream_iter_->key().starts_with(subkey_prefix_);
+    if (!valid_) {
+      stream_iter_->Reset();
+    }
+  }
+
+  if (valid_) {
+    findSubData();
+  } else if (metadata_iter_->Valid()) {
+    metadata_iter_->Next();
+    valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(metakey_prefix_);
+    if (valid_) {
+      findMetaData();
+    }
+  }
+}
+
+const std::vector<MigrateItem> &MigrateIterator::GetItems() const {
+  assert(valid_);
+  return items_;
+}
+
+std::string MigrateIterator::GetLogData() const {
+  assert(valid_);
+  return log_data_;
+}
+
+void MigrateIterator::findMetaData() {

Review Comment:
   findMetaData => findMetadata



##########
src/cluster/migrate_iterator.cc:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "cluster/migrate_iterator.h"
+
+#include "db_util.h"
+#include "storage/redis_db.h"
+
+MigrateIterator::MigrateIterator(engine::Storage *storage, const rocksdb::ReadOptions &read_options)
+    : metadata_cf_(storage->GetCFHandle(engine::kMetadataColumnFamilyName)),
+      subkey_cf_(storage->GetCFHandle(engine::kSubkeyColumnFamilyName)),
+      zset_score_cf_(storage->GetCFHandle(engine::kZSetScoreColumnFamilyName)),
+      stream_cf_(storage->GetCFHandle(engine::kStreamColumnFamilyName)),
+      metadata_iter_(util::UniqueIterator(storage, read_options, metadata_cf_)),
+      subdata_iter_(util::UniqueIterator(storage, read_options, subkey_cf_)),
+      stream_iter_(util::UniqueIterator(storage, read_options, stream_cf_)),
+      valid_(false),
+      metadata_(RedisType::kRedisNone, false) {}
+
+bool MigrateIterator::Valid() const { return valid_; }
+
+void MigrateIterator::Seek(const rocksdb::Slice &target) {
+  items_.clear();
+  log_data_.clear();
+  metadata_iter_->Reset();
+  subdata_iter_->Reset();
+  stream_iter_->Reset();
+  metakey_prefix_.clear();
+
+  metadata_iter_->Seek(target);
+  valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(target);
+  if (valid_) {
+    metakey_prefix_ = target.ToString();
+    findMetaData();
+  }
+}
+
+void MigrateIterator::Next() {
+  assert(valid_);
+  valid_ = false;
+  items_.clear();
+
+  if (subdata_iter_->Valid()) {
+    subdata_iter_->Next();
+    valid_ = subdata_iter_->Valid() && subdata_iter_->key().starts_with(subkey_prefix_);
+    if (!valid_) {
+      subdata_iter_->Reset();
+    }
+  } else if (stream_iter_->Valid()) {
+    stream_iter_->Next();
+    valid_ = stream_iter_->Valid() && stream_iter_->key().starts_with(subkey_prefix_);
+    if (!valid_) {
+      stream_iter_->Reset();
+    }
+  }
+
+  if (valid_) {
+    findSubData();
+  } else if (metadata_iter_->Valid()) {
+    metadata_iter_->Next();
+    valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(metakey_prefix_);
+    if (valid_) {
+      findMetaData();
+    }
+  }
+}
+
+const std::vector<MigrateItem> &MigrateIterator::GetItems() const {
+  assert(valid_);
+  return items_;
+}
+
+std::string MigrateIterator::GetLogData() const {
+  assert(valid_);
+  return log_data_;
+}
+
+void MigrateIterator::findMetaData() {
+  assert(metadata_iter_->Valid());
+  Metadata metadata(kRedisNone, false /* generate_version */);
+  metadata.Decode(metadata_iter_->value().ToString());
+  RedisType redis_type = metadata.Type();
+  metadata_ = metadata;
+
+  redis::WriteBatchLogData log_data(redis_type);
+  if (redis_type == RedisType::kRedisList) {
+    log_data.SetArguments({std::to_string(RedisCommand::kRedisCmdRPush)});
+  }
+
+  log_data_ = log_data.Encode();
+
+  items_.push_back(MigrateItem{metadata_cf_, metadata_iter_->key().ToString(), metadata_iter_->value().ToString()});
+
+  if (redis_type == RedisType::kRedisStream) {
+    stream_iter_->Reset();
+  } else {
+    subdata_iter_->Reset();

Review Comment:
   subdata_iter_ => subkey_iter_ to keep consistent with the column family name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org