You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2022/03/10 07:27:02 UTC

[incubator-pegasus] branch master updated: feat: Using multi_set in shell tool to speedup data copy (#920)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 414d1b9  feat: Using multi_set in shell tool to speedup data copy (#920)
414d1b9 is described below

commit 414d1b9235740273862e2e8a6ce61be9715ecf6f
Author: liguohao <48...@users.noreply.github.com>
AuthorDate: Thu Mar 10 15:26:54 2022 +0800

    feat: Using multi_set in shell tool to speedup data copy (#920)
---
 src/shell/command_helper.h             | 93 +++++++++++++++++++++++++++++++++-
 src/shell/commands/data_operations.cpp | 76 +++++++++++++++++++++++++--
 src/shell/config.ini                   |  2 +-
 src/shell/main.cpp                     |  6 +--
 4 files changed, 166 insertions(+), 11 deletions(-)

diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index e1912b2..53062a3 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -35,6 +35,7 @@
 #include <dsn/dist/replication/mutation_log_tool.h>
 #include <dsn/perf_counter/perf_counter_utils.h>
 #include <dsn/utility/string_view.h>
+#include <dsn/utility/synchronize.h>
 #include <dsn/utils/time_utils.h>
 
 #include <rrdb/rrdb.code.definition.h>
@@ -67,7 +68,8 @@ enum scan_data_operator
     SCAN_COPY,
     SCAN_CLEAR,
     SCAN_COUNT,
-    SCAN_GEN_GEO
+    SCAN_GEN_GEO,
+    SCAN_AND_MULTI_SET
 };
 class top_container
 {
@@ -148,6 +150,12 @@ struct scan_data_context
     bool count_hash_key;
     std::string last_hash_key;
     std::atomic_long split_hash_key_count;
+
+    long data_count;
+    uint32_t multi_ttl_seconds;
+    std::unordered_map<std::string, std::map<std::string, std::string>> multi_kvs;
+    dsn::utils::semaphore sema;
+
     scan_data_context(scan_data_operator op_,
                       int split_id_,
                       int max_batch_count_,
@@ -156,6 +164,7 @@ struct scan_data_context
                       pegasus::pegasus_client *client_,
                       pegasus::geo::geo_client *geoclient_,
                       std::atomic_bool *error_occurred_,
+                      int max_multi_set_concurrency = 100,
                       bool stat_size_ = false,
                       std::shared_ptr<rocksdb::Statistics> statistics_ = nullptr,
                       int top_count_ = 0,
@@ -179,7 +188,10 @@ struct scan_data_context
           top_count(top_count_),
           top_rows(top_count_),
           count_hash_key(count_hash_key_),
-          split_hash_key_count(0)
+          split_hash_key_count(0),
+          data_count(0),
+          multi_ttl_seconds(0),
+          sema(max_multi_set_concurrency)
     {
         // max_batch_count should > 1 because scan may be terminated
         // when split_request_count = 1
@@ -271,6 +283,83 @@ inline int compute_ttl_seconds(uint32_t expire_ts_seconds, bool &ts_expired)
     return 0;
 }
 
+inline void batch_execute_multi_set(scan_data_context *context)
+{
+    for (const auto &kv : context->multi_kvs) {
+        // wait for satisfied with max_multi_set_concurrency
+        context->sema.wait();
+        int multi_size = kv.second.size();
+        context->client->async_multi_set(
+            kv.first,
+            kv.second,
+            [context, multi_size](int err, pegasus::pegasus_client::internal_info &&info) {
+                if (err != pegasus::PERR_OK) {
+                    if (!context->split_completed.exchange(true)) {
+                        fprintf(stderr,
+                                "ERROR: split[%d] async_multi_set set failed: %s\n",
+                                context->split_id,
+                                context->client->get_error_string(err));
+                        context->error_occurred->store(true);
+                    }
+                } else {
+                    context->split_rows += multi_size;
+                }
+                context->sema.signal();
+            },
+            context->timeout_ms,
+            context->multi_ttl_seconds);
+    }
+    context->multi_kvs.clear();
+    context->data_count = 0;
+}
+// copy data by async_multi_set
+inline void scan_multi_data_next(scan_data_context *context)
+{
+    if (!context->split_completed.load() && !context->error_occurred->load()) {
+        context->scanner->async_next([context](int ret,
+                                               std::string &&hash_key,
+                                               std::string &&sort_key,
+                                               std::string &&value,
+                                               pegasus::pegasus_client::internal_info &&info,
+                                               uint32_t expire_ts_seconds) {
+            if (ret == pegasus::PERR_OK) {
+                if (validate_filter(context, sort_key, value)) {
+                    bool ts_expired = false;
+                    int ttl_seconds = 0;
+                    ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
+                    if (!ts_expired) {
+                        context->data_count++;
+                        if (context->multi_kvs.find(hash_key) == context->multi_kvs.end()) {
+                            context->multi_kvs.emplace(hash_key,
+                                                       std::map<std::string, std::string>());
+                        }
+                        if (context->multi_ttl_seconds < ttl_seconds || ttl_seconds == 0) {
+                            context->multi_ttl_seconds = ttl_seconds;
+                        }
+                        context->multi_kvs[hash_key].emplace(std::move(sort_key), std::move(value));
+
+                        if (context->data_count >= context->max_batch_count) {
+                            batch_execute_multi_set(context);
+                        }
+                    }
+                }
+                scan_multi_data_next(context);
+            } else if (ret == pegasus::PERR_SCAN_COMPLETE) {
+                batch_execute_multi_set(context);
+                context->split_completed.store(true);
+            } else {
+                if (!context->split_completed.exchange(true)) {
+                    fprintf(stderr,
+                            "ERROR: split[%d] scan next failed: %s\n",
+                            context->split_id,
+                            context->client->get_error_string(ret));
+                    context->error_occurred->store(true);
+                }
+            }
+        });
+    }
+}
+
 inline void scan_data_next(scan_data_context *context)
 {
     while (!context->split_completed.load() && !context->error_occurred->load() &&
diff --git a/src/shell/commands/data_operations.cpp b/src/shell/commands/data_operations.cpp
index 90d9703..d8cd3de 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -1538,6 +1538,7 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
                                            {"sort_key_filter_pattern", required_argument, 0, 'y'},
                                            {"value_filter_type", required_argument, 0, 'v'},
                                            {"value_filter_pattern", required_argument, 0, 'z'},
+                                           {"max_multi_set_concurrency", required_argument, 0, 'm'},
                                            {"no_overwrite", no_argument, 0, 'n'},
                                            {"no_value", no_argument, 0, 'i'},
                                            {"geo_data", no_argument, 0, 'g'},
@@ -1549,9 +1550,11 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
     std::string target_geo_app_name;
     int32_t partition = -1;
     int max_batch_count = 500;
+    int max_multi_set_concurrency = 20;
     int timeout_ms = sc->timeout_ms;
     bool is_geo_data = false;
     bool no_overwrite = false;
+    bool use_multi_set = false;
     std::string hash_key_filter_type_name("no_filter");
     std::string sort_key_filter_type_name("no_filter");
     pegasus::pegasus_client::filter_type sort_key_filter_type =
@@ -1568,7 +1571,7 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
         int option_index = 0;
         int c;
         c = getopt_long(
-            args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:nige", long_options, &option_index);
+            args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:m:o:nigeu", long_options, &option_index);
         if (c == -1)
             break;
         switch (c) {
@@ -1634,6 +1637,18 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
         case 'z':
             value_filter_pattern = unescape_str(optarg);
             break;
+        case 'm':
+            if (!dsn::buf2int32(optarg, max_multi_set_concurrency)) {
+                fprintf(stderr, "ERROR: parse %s as max_multi_set_concurrency failed\n", optarg);
+                return false;
+            }
+            break;
+        case 'o':
+            if (!dsn::buf2int32(optarg, options.batch_size)) {
+                fprintf(stderr, "ERROR: parse %s as scan_option_batch_size failed\n", optarg);
+                return false;
+            }
+            break;
         case 'n':
             no_overwrite = true;
             break;
@@ -1646,6 +1661,9 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
         case 'e':
             options.return_expire_ts = false;
             break;
+        case 'u':
+            use_multi_set = true;
+            break;
         default:
             return false;
         }
@@ -1676,6 +1694,16 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
         return false;
     }
 
+    if (max_multi_set_concurrency <= 0) {
+        fprintf(stderr, "ERROR: max_multi_set_concurrency should be greater than 0\n");
+        return false;
+    }
+
+    if (use_multi_set && no_overwrite) {
+        fprintf(stderr, "ERROR: copy with multi_set not support no_overwrite!\n");
+        return false;
+    }
+
     fprintf(stderr, "INFO: source_cluster_name = %s\n", sc->pg_client->get_cluster_name());
     fprintf(stderr, "INFO: source_app_name = %s\n", sc->pg_client->get_app_name());
     fprintf(stderr, "INFO: target_cluster_name = %s\n", target_cluster_name.c_str());
@@ -1683,6 +1711,11 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
     if (is_geo_data) {
         fprintf(stderr, "INFO: target_geo_app_name = %s\n", target_geo_app_name.c_str());
     }
+    if (use_multi_set) {
+        fprintf(stderr,
+                "INFO: copy use asyncer_multi_set, max_multi_set_concurrency = %d\n",
+                max_multi_set_concurrency);
+    }
     fprintf(stderr,
             "INFO: partition = %s\n",
             partition >= 0 ? boost::lexical_cast<std::string>(partition).c_str() : "all");
@@ -1776,21 +1809,50 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
 
     std::atomic_bool error_occurred(false);
     std::vector<std::unique_ptr<scan_data_context>> contexts;
+
+    scan_data_operator op = SCAN_COPY;
+    if (is_geo_data) {
+        op = SCAN_GEN_GEO;
+    } else if (use_multi_set) {
+        fprintf(stderr,
+                "WARN: used multi_set will lose accurate ttl time per value! "
+                "ttl time will be assign the max value of this batch data.\n");
+        op = SCAN_AND_MULTI_SET;
+        // threadpool worker_count should greater than source app scanner count
+        int worker_count = dsn_config_get_value_int64("threadpool.THREAD_POOL_DEFAULT",
+                                                      "worker_count",
+                                                      0,
+                                                      "get THREAD_POOL_DEFAULT worker_count.");
+        fprintf(stderr, "INFO: THREAD_POOL_DEFAULT worker_count = %d\n", worker_count);
+        if (worker_count <= split_count) {
+            fprintf(stderr,
+                    "INFO: THREAD_POOL_DEFAULT worker_count should greater than source app scanner "
+                    "count %d",
+                    split_count);
+            return true;
+        }
+    }
+
     for (int i = 0; i < split_count; i++) {
-        scan_data_context *context = new scan_data_context(is_geo_data ? SCAN_GEN_GEO : SCAN_COPY,
+        scan_data_context *context = new scan_data_context(op,
                                                            i,
                                                            max_batch_count,
                                                            timeout_ms,
                                                            scanners[i],
                                                            target_client,
                                                            target_geo_client.get(),
-                                                           &error_occurred);
+                                                           &error_occurred,
+                                                           max_multi_set_concurrency);
         context->set_sort_key_filter(sort_key_filter_type, sort_key_filter_pattern);
         context->set_value_filter(value_filter_type, value_filter_pattern);
         if (no_overwrite)
             context->set_no_overwrite();
         contexts.emplace_back(context);
-        dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
+        if (op == SCAN_AND_MULTI_SET) {
+            dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_multi_data_next, context));
+        } else {
+            dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
+        }
     }
 
     // wait thread complete
@@ -1803,8 +1865,11 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
         long cur_total_rows = 0;
         for (int i = 0; i < split_count; i++) {
             cur_total_rows += contexts[i]->split_rows.load();
-            if (contexts[i]->split_request_count.load() == 0)
+            if (op != SCAN_AND_MULTI_SET && contexts[i]->split_request_count.load() == 0) {
                 completed_split_count++;
+            } else if (contexts[i]->split_completed.load()) {
+                completed_split_count++;
+            }
         }
         if (error_occurred.load()) {
             fprintf(stderr,
@@ -2370,6 +2435,7 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
                                                            sc->pg_client,
                                                            nullptr,
                                                            &error_occurred,
+                                                           0,
                                                            stat_size,
                                                            statistics,
                                                            top_count,
diff --git a/src/shell/config.ini b/src/shell/config.ini
index 62a649f..b4adc08 100644
--- a/src/shell/config.ini
+++ b/src/shell/config.ini
@@ -49,7 +49,7 @@ worker_priority = THREAD_xPRIORITY_NORMAL
 
 [threadpool.THREAD_POOL_DEFAULT]
 name = default
-worker_count = 8
+worker_count = 20
 
 [threadpool.THREAD_POOL_META_SERVER]
 name = meta_server
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 8be8555..308d54c 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -275,9 +275,9 @@ static command_executor commands[] = {
         "[-s|--sort_key_filter_type anywhere|prefix|postfix|exact] "
         "[-y|--sort_key_filter_pattern str] "
         "[-v|--value_filter_type anywhere|prefix|postfix|exact] "
-        "[-z|--value_filter_pattern str] "
-        "[-n|--no_overwrite] [-i|--no_value] [-g|--geo_data] "
-        "[-e|--no_ttl]",
+        "[-z|--value_filter_pattern str] [-m|--max_multi_set_concurrency] "
+        "[-o|--scan_option_batch_size] [-e|--no_ttl] "
+        "[-n|--no_overwrite] [-i|--no_value] [-g|--geo_data] [-u|--use_multi_set]",
         data_operations,
     },
     {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org