You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pegasus.apache.org by GitBox <gi...@apache.org> on 2022/06/15 06:32:23 UTC

[GitHub] [incubator-pegasus] GehaFearless opened a new pull request, #1004: fix: fix empty hashkey copy_data

GehaFearless opened a new pull request, #1004:
URL: https://github.com/apache/incubator-pegasus/pull/1004

   ### What problem does this PR solve? <!--add issue link with summary if exists-->
   Add solution for multi_set copy_data when hashkey is empty.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1004: fix: fix empty hashkey copy_data

Posted by GitBox <gi...@apache.org>.
acelyc111 commented on code in PR #1004:
URL: https://github.com/apache/incubator-pegasus/pull/1004#discussion_r901607533


##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/fmt_logging.h>
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+using std::map;
+using std::string;
+using std::vector;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const string empty_hash_key = "";
+static const string srouce_app_name = "copy_data_source_table";
+static const string destination_app_name = "copy_data_destination_table";
+static char buffer[256];
+static map<string, map<string, string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    string hash_key;
+    string sort_key;
+    string value;
+    map<string, map<string, string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug_f("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return string(buffer + pos, length);
+    } else {
+        return string(buffer + pos, sizeof(buffer) - pos) +
+               string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug_f("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    string hash_key;
+    string sort_key;
+    string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug_f("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug_f("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug_f("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
+{
+    ddebug_f("TESTING_COPY_DATA, EMPTY HAS_HKEY COPY ....");

Review Comment:
   ```suggestion
       ddebug_f("TESTING_COPY_DATA, EMPTY HASH_KEY COPY ....");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1004: fix: fix empty hashkey copy_data

Posted by GitBox <gi...@apache.org>.
acelyc111 commented on code in PR #1004:
URL: https://github.com/apache/incubator-pegasus/pull/1004#discussion_r901298236


##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+using std::map;
+using std::string;
+using std::vector;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const string empty_hash_key = "";
+static const string srouce_app_name = "copy_data_source_table";
+static const string destination_app_name = "copy_data_destination_table";
+static char buffer[256];
+static map<string, map<string, string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    string hash_key;
+    string sort_key;
+    string value;
+    map<string, map<string, string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return string(buffer + pos, length);
+    } else {
+        return string(buffer + pos, sizeof(buffer) - pos) +
+               string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    string hash_key;
+    string sort_key;
+    string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPY ....");

Review Comment:
   ```suggestion
       ddebug("TESTING_COPY_DATA, EMPTY HASH_KEY COPY ....");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1004: fix: fix empty hashkey copy_data

Posted by GitBox <gi...@apache.org>.
acelyc111 commented on code in PR #1004:
URL: https://github.com/apache/incubator-pegasus/pull/1004#discussion_r898803586


##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void check_and_put(std::map<std::string, std::map<std::string, std::string>> &data,

Review Comment:
   Seems there are some duplicate code to some other files in src/test/function_test/*, could you please do some refactor at first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1004: fix: fix empty hashkey copy_data

Posted by GitBox <gi...@apache.org>.
acelyc111 commented on code in PR #1004:
URL: https://github.com/apache/incubator-pegasus/pull/1004#discussion_r899865777


##########
src/test/function_test/utils.h:
##########
@@ -80,3 +80,127 @@ generate_sortkey_value_map(const std::vector<std::string> sortkeys,
     }
     return result;
 }
+
+inline void
+check_and_put(std::map<std::string, std::map<std::string, std::pair<std::string, uint32_t>>> &data,
+              const std::string &hash_key,
+              const std::string &sort_key,
+              const std::string &value,
+              uint32_t expire_ts_seconds)
+{
+    auto it1 = data.find(hash_key);
+    if (it1 != data.end()) {
+        auto it2 = it1->second.find(sort_key);
+        ASSERT_EQ(it1->second.end(), it2)
+            << "Duplicate: hash_key=" << hash_key << ", sort_key=" << sort_key
+            << ", old_value=" << it2->second.first << ", new_value=" << value
+            << ", old_expire_ts_seconds=" << it2->second.second
+            << ", new_expire_ts_seconds=" << expire_ts_seconds;
+    }
+    data[hash_key][sort_key] = std::pair<std::string, uint32_t>(value, expire_ts_seconds);
+}
+
+inline void check_and_put(std::map<std::string, std::map<std::string, std::string>> &data,
+                          const std::string &hash_key,
+                          const std::string &sort_key,
+                          const std::string &value)
+{
+    auto it1 = data.find(hash_key);
+    if (it1 != data.end()) {
+        auto it2 = it1->second.find(sort_key);
+        ASSERT_EQ(it1->second.end(), it2)
+            << "Duplicate: hash_key=" << hash_key << ", sort_key=" << sort_key
+            << ", old_value=" << it2->second << ", new_value=" << value;
+    }
+    data[hash_key][sort_key] = value;
+}
+
+inline void check_and_put(std::map<std::string, std::string> &data,

Review Comment:
   这个函数在哪里用了?



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)

Review Comment:
   ```suggestion
   TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
   ```
   ```suggestion
   TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
   ```



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",
+           (int)raw_scanners.size());
+
+    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+
+    int split_count = scanners.size();
+    ddebug("INFO: prepare scanners succeed, split_count = %d\n", split_count);
+
+    std::atomic_bool error_occurred(false);
+    std::vector<std::unique_ptr<scan_data_context>> contexts;
+    std::unique_ptr<pegasus::geo::geo_client> geo_client;

Review Comment:
   why use geo_client ?



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",
+           (int)raw_scanners.size());
+
+    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+
+    int split_count = scanners.size();
+    ddebug("INFO: prepare scanners succeed, split_count = %d\n", split_count);
+
+    std::atomic_bool error_occurred(false);
+    std::vector<std::unique_ptr<scan_data_context>> contexts;
+    std::unique_ptr<pegasus::geo::geo_client> geo_client;
+
+    for (int i = 0; i < split_count; i++) {
+        scan_data_context *context = new scan_data_context(SCAN_AND_MULTI_SET,
+                                                           i,
+                                                           max_batch_count,
+                                                           timeout_ms,
+                                                           scanners[i],
+                                                           destination_client,
+                                                           geo_client.get(),
+                                                           &error_occurred,
+                                                           max_multi_set_concurrency);
+        contexts.emplace_back(context);
+        dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_multi_data_next, context));
+    }
+
+    // wait thread complete
+    int sleep_seconds = 0;
+    long last_total_rows = 0;
+    while (true) {

Review Comment:
   Why use a loop to detect the copy progress finished, it's too expensive.



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;

Review Comment:
   better to also add `using namespace std::xxx;`, then most of `std::` prefixes can be omitted.



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");

Review Comment:
   ```suggestion
       ddebug("TESTING_COPY_DATA, EMPTY HASH_KEY COPY ....");
   ```
   ```suggestion
       ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");
   ```



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");

Review Comment:
   ```suggestion
       ddebug("TESTING_COPY_DATA, EMPTY HASH_KEY COPY ....");
   ```
   ```suggestion
       ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");
   ```



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",
+           (int)raw_scanners.size());
+
+    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+
+    int split_count = scanners.size();
+    ddebug("INFO: prepare scanners succeed, split_count = %d\n", split_count);
+
+    std::atomic_bool error_occurred(false);
+    std::vector<std::unique_ptr<scan_data_context>> contexts;
+    std::unique_ptr<pegasus::geo::geo_client> geo_client;
+
+    for (int i = 0; i < split_count; i++) {
+        scan_data_context *context = new scan_data_context(SCAN_AND_MULTI_SET,
+                                                           i,
+                                                           max_batch_count,
+                                                           timeout_ms,
+                                                           scanners[i],
+                                                           destination_client,
+                                                           geo_client.get(),
+                                                           &error_occurred,
+                                                           max_multi_set_concurrency);
+        contexts.emplace_back(context);
+        dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_multi_data_next, context));
+    }
+
+    // wait thread complete
+    int sleep_seconds = 0;
+    long last_total_rows = 0;
+    while (true) {

Review Comment:
   Why use a loop to detect the copy progress finished, it's too expensive.



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",
+           (int)raw_scanners.size());
+
+    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+
+    int split_count = scanners.size();
+    ddebug("INFO: prepare scanners succeed, split_count = %d\n", split_count);
+
+    std::atomic_bool error_occurred(false);
+    std::vector<std::unique_ptr<scan_data_context>> contexts;
+    std::unique_ptr<pegasus::geo::geo_client> geo_client;

Review Comment:
   why use geo_client ?



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)

Review Comment:
   ```suggestion
   TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
   ```
   ```suggestion
   TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
   ```



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",
+           (int)raw_scanners.size());
+
+    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+
+    int split_count = scanners.size();
+    ddebug("INFO: prepare scanners succeed, split_count = %d\n", split_count);
+
+    std::atomic_bool error_occurred(false);
+    std::vector<std::unique_ptr<scan_data_context>> contexts;
+    std::unique_ptr<pegasus::geo::geo_client> geo_client;
+
+    for (int i = 0; i < split_count; i++) {

Review Comment:
   you should run the copy data shell tool, not the function, otherwise the tool is not coveraged.



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;

Review Comment:
   better to also add `using namespace std::xxx;`, then most of `std::` prefixes can be omitted. xxx can be shared_ptr, string, map and etc.



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <geo/lib/geo_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const std::string empty_hash_key = "";
+static const std::string srouce_app_name = "copy_data_source";
+static const std::string destination_app_name = "copy_data_destination";
+static char buffer[256];
+static std::map<std::string, std::map<std::string, std::string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    std::vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    std::map<std::string, std::map<std::string, std::string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const std::string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return std::string(buffer + pos, length);
+    } else {
+        return std::string(buffer + pos, sizeof(buffer) - pos) +
+               std::string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    std::string hash_key;
+    std::string sort_key;
+    std::string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPOY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HAS_HKEY COPOY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    std::vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",
+           (int)raw_scanners.size());
+
+    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+
+    int split_count = scanners.size();
+    ddebug("INFO: prepare scanners succeed, split_count = %d\n", split_count);
+
+    std::atomic_bool error_occurred(false);
+    std::vector<std::unique_ptr<scan_data_context>> contexts;
+    std::unique_ptr<pegasus::geo::geo_client> geo_client;
+
+    for (int i = 0; i < split_count; i++) {

Review Comment:
   you should run the copy data shell tool, not the function, otherwise the tool is not coveraged by you test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1004: fix: fix empty hashkey copy_data

Posted by GitBox <gi...@apache.org>.
acelyc111 commented on code in PR #1004:
URL: https://github.com/apache/incubator-pegasus/pull/1004#discussion_r901302060


##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+using std::map;
+using std::string;
+using std::vector;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const string empty_hash_key = "";
+static const string srouce_app_name = "copy_data_source_table";
+static const string destination_app_name = "copy_data_destination_table";
+static char buffer[256];
+static map<string, map<string, string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    string hash_key;
+    string sort_key;
+    string value;
+    map<string, map<string, string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return string(buffer + pos, length);
+    } else {
+        return string(buffer + pos, sizeof(buffer) - pos) +
+               string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    string hash_key;
+    string sort_key;
+    string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HASH_KEY COPY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",

Review Comment:
   for the new added code, it's suggest to use ddebug_f



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+using std::map;
+using std::string;
+using std::vector;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const string empty_hash_key = "";
+static const string srouce_app_name = "copy_data_source_table";
+static const string destination_app_name = "copy_data_destination_table";
+static char buffer[256];
+static map<string, map<string, string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    string hash_key;
+    string sort_key;
+    string value;
+    map<string, map<string, string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return string(buffer + pos, length);
+    } else {
+        return string(buffer + pos, sizeof(buffer) - pos) +
+               string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    string hash_key;
+    string sort_key;
+    string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HASH_KEY COPY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",
+           (int)raw_scanners.size());
+
+    vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+
+    int split_count = scanners.size();
+    ddebug("INFO: prepare scanners succeed, split_count = %d\n", split_count);
+
+    std::atomic_bool error_occurred(false);
+    vector<std::unique_ptr<scan_data_context>> contexts;
+
+    for (int i = 0; i < split_count; i++) {
+        scan_data_context *context = new scan_data_context(SCAN_AND_MULTI_SET,
+                                                           i,
+                                                           max_batch_count,
+                                                           timeout_ms,
+                                                           scanners[i],
+                                                           destination_client,
+                                                           nullptr,
+                                                           &error_occurred,
+                                                           max_multi_set_concurrency);
+        contexts.emplace_back(context);
+        dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_multi_data_next, context));
+    }
+
+    // wait thread complete
+    int sleep_seconds = 0;
+    while (sleep_seconds < 120) {
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        sleep_seconds++;
+        int completed_split_count = 0;
+        for (int i = 0; i < split_count; i++) {
+            if (contexts[i]->split_completed.load()) {
+                completed_split_count++;
+            }
+        }
+        if (completed_split_count == split_count)
+            break;

Review Comment:
   ```suggestion
           if (completed_split_count == split_count) {
               break;
           }
   ```



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+using std::map;
+using std::string;
+using std::vector;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const string empty_hash_key = "";
+static const string srouce_app_name = "copy_data_source_table";
+static const string destination_app_name = "copy_data_destination_table";
+static char buffer[256];
+static map<string, map<string, string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    string hash_key;
+    string sort_key;
+    string value;
+    map<string, map<string, string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return string(buffer + pos, length);
+    } else {
+        return string(buffer + pos, sizeof(buffer) - pos) +
+               string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    string hash_key;
+    string sort_key;
+    string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HASH_KEY COPY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",

Review Comment:
   it's confused to add `INFO:` prefix for `debug` log.



##########
src/test/function_test/test_copy.cpp:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include <map>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <dsn/service_api_c.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <unistd.h>
+
+#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
+#include "global_env.h"
+#include "shell/commands.h"
+#include "utils.h"
+
+using namespace ::pegasus;
+using std::map;
+using std::string;
+using std::vector;
+
+extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
+static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+static const int max_batch_count = 500;
+static const int timeout_ms = 5000;
+static const int max_multi_set_concurrency = 20;
+static const int default_partitions = 4;
+static const string empty_hash_key = "";
+static const string srouce_app_name = "copy_data_source_table";
+static const string destination_app_name = "copy_data_destination_table";
+static char buffer[256];
+static map<string, map<string, string>> base_data;
+static pegasus_client *srouce_client;
+static pegasus_client *destination_client;
+
+static void verify_data()
+{
+    pegasus_client::scan_options options;
+    vector<pegasus_client::pegasus_scanner *> scanners;
+    int ret = destination_client->get_unordered_scanners(INT_MAX, options, scanners);
+    ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
+                      << destination_client->get_error_string(ret);
+
+    string hash_key;
+    string sort_key;
+    string value;
+    map<string, map<string, string>> data;
+    for (auto scanner : scanners) {
+        ASSERT_NE(nullptr, scanner);
+        while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
+            check_and_put(data, hash_key, sort_key, value);
+        }
+        ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
+                                           << destination_client->get_error_string(ret);
+        delete scanner;
+    }
+
+    compare(data, base_data);
+
+    ddebug("Data and base_data are the same.");
+}
+
+static void create_table_and_get_client()
+{
+    dsn::error_code err;
+    err = ddl_client->create_app(srouce_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    err = ddl_client->create_app(destination_app_name, "pegasus", default_partitions, 3, {}, false);
+    ASSERT_EQ(dsn::ERR_OK, err);
+
+    srouce_client = pegasus_client_factory::get_client("mycluster", srouce_app_name.c_str());
+    destination_client =
+        pegasus_client_factory::get_client("mycluster", destination_app_name.c_str());
+}
+
+// REQUIRED: 'buffer' has been filled with random chars.
+static const string random_string()
+{
+    int pos = random() % sizeof(buffer);
+    buffer[pos] = CCH[random() % sizeof(CCH)];
+    unsigned int length = random() % sizeof(buffer) + 1;
+    if (pos + length < sizeof(buffer)) {
+        return string(buffer + pos, length);
+    } else {
+        return string(buffer + pos, sizeof(buffer) - pos) +
+               string(buffer, length + pos - sizeof(buffer));
+    }
+}
+
+static void fill_data()
+{
+    ddebug("FILLING_DATA...");
+
+    srandom((unsigned int)time(nullptr));
+    for (auto &c : buffer) {
+        c = CCH[random() % sizeof(CCH)];
+    }
+
+    string hash_key;
+    string sort_key;
+    string value;
+    while (base_data[empty_hash_key].size() < 1000) {
+        sort_key = random_string();
+        value = random_string();
+        int ret = srouce_client->set(empty_hash_key, sort_key, value);
+        ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                << ", sort_key=" << sort_key
+                                << ", error=" << srouce_client->get_error_string(ret);
+        base_data[empty_hash_key][sort_key] = value;
+    }
+
+    while (base_data.size() < 500) {
+        hash_key = random_string();
+        while (base_data[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            int ret = srouce_client->set(hash_key, sort_key, value);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << srouce_client->get_error_string(ret);
+            base_data[hash_key][sort_key] = value;
+        }
+    }
+
+    ddebug("Data filled.");
+}
+
+class copy_data_test : public testing::Test
+{
+public:
+    static void SetUpTestCase()
+    {
+        ddebug("SetUp...");
+        create_table_and_get_client();
+        fill_data();
+    }
+
+    static void TearDownTestCase()
+    {
+        ddebug("TearDown...");
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+};
+
+TEST_F(copy_data_test, EMPTY_HASH_KEY_COPY)
+{
+    ddebug("TESTING_COPY_DATA, EMPTY HASH_KEY COPY ....");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    vector<pegasus::pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = srouce_client->get_unordered_scanners(INT_MAX, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << srouce_client->get_error_string(ret);
+
+    ddebug("INFO: open source app scanner succeed, partition_count = %d\n",
+           (int)raw_scanners.size());
+
+    vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+
+    int split_count = scanners.size();
+    ddebug("INFO: prepare scanners succeed, split_count = %d\n", split_count);
+
+    std::atomic_bool error_occurred(false);
+    vector<std::unique_ptr<scan_data_context>> contexts;
+
+    for (int i = 0; i < split_count; i++) {
+        scan_data_context *context = new scan_data_context(SCAN_AND_MULTI_SET,
+                                                           i,
+                                                           max_batch_count,
+                                                           timeout_ms,
+                                                           scanners[i],
+                                                           destination_client,
+                                                           nullptr,
+                                                           &error_occurred,
+                                                           max_multi_set_concurrency);
+        contexts.emplace_back(context);
+        dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_multi_data_next, context));
+    }
+
+    // wait thread complete
+    int sleep_seconds = 0;
+    while (sleep_seconds < 120) {
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        sleep_seconds++;
+        int completed_split_count = 0;
+        for (int i = 0; i < split_count; i++) {
+            if (contexts[i]->split_completed.load()) {
+                completed_split_count++;
+            }
+        }
+        if (completed_split_count == split_count)
+            break;
+    }
+
+    ASSERT_EQ(false, error_occurred.load()) << "error occurred, processing terminated or timeout!";

Review Comment:
   ```suggestion
       ASSERT_FALSE(error_occurred.load()) << "error occurred, processing terminated or timeout!";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 merged pull request #1004: fix: fix empty hashkey copy_data

Posted by GitBox <gi...@apache.org>.
acelyc111 merged PR #1004:
URL: https://github.com/apache/incubator-pegasus/pull/1004


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] GehaFearless commented on a diff in pull request #1004: fix: fix empty hashkey copy_data

Posted by GitBox <gi...@apache.org>.
GehaFearless commented on code in PR #1004:
URL: https://github.com/apache/incubator-pegasus/pull/1004#discussion_r899903862


##########
src/test/function_test/utils.h:
##########
@@ -80,3 +80,127 @@ generate_sortkey_value_map(const std::vector<std::string> sortkeys,
     }
     return result;
 }
+
+inline void
+check_and_put(std::map<std::string, std::map<std::string, std::pair<std::string, uint32_t>>> &data,
+              const std::string &hash_key,
+              const std::string &sort_key,
+              const std::string &value,
+              uint32_t expire_ts_seconds)
+{
+    auto it1 = data.find(hash_key);
+    if (it1 != data.end()) {
+        auto it2 = it1->second.find(sort_key);
+        ASSERT_EQ(it1->second.end(), it2)
+            << "Duplicate: hash_key=" << hash_key << ", sort_key=" << sort_key
+            << ", old_value=" << it2->second.first << ", new_value=" << value
+            << ", old_expire_ts_seconds=" << it2->second.second
+            << ", new_expire_ts_seconds=" << expire_ts_seconds;
+    }
+    data[hash_key][sort_key] = std::pair<std::string, uint32_t>(value, expire_ts_seconds);
+}
+
+inline void check_and_put(std::map<std::string, std::map<std::string, std::string>> &data,
+                          const std::string &hash_key,
+                          const std::string &sort_key,
+                          const std::string &value)
+{
+    auto it1 = data.find(hash_key);
+    if (it1 != data.end()) {
+        auto it2 = it1->second.find(sort_key);
+        ASSERT_EQ(it1->second.end(), it2)
+            << "Duplicate: hash_key=" << hash_key << ", sort_key=" << sort_key
+            << ", old_value=" << it2->second << ", new_value=" << value;
+    }
+    data[hash_key][sort_key] = value;
+}
+
+inline void check_and_put(std::map<std::string, std::string> &data,

Review Comment:
   both test_scan.cpp & test_copy.cpp



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org