You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2022/03/10 07:27:02 UTC
[incubator-pegasus] branch master updated: feat: Using multi_set in shell tool to speedup data copy (#920)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 414d1b9 feat: Using multi_set in shell tool to speedup data copy (#920)
414d1b9 is described below
commit 414d1b9235740273862e2e8a6ce61be9715ecf6f
Author: liguohao <48...@users.noreply.github.com>
AuthorDate: Thu Mar 10 15:26:54 2022 +0800
feat: Using multi_set in shell tool to speedup data copy (#920)
---
src/shell/command_helper.h | 93 +++++++++++++++++++++++++++++++++-
src/shell/commands/data_operations.cpp | 76 +++++++++++++++++++++++++--
src/shell/config.ini | 2 +-
src/shell/main.cpp | 6 +--
4 files changed, 166 insertions(+), 11 deletions(-)
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index e1912b2..53062a3 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -35,6 +35,7 @@
#include <dsn/dist/replication/mutation_log_tool.h>
#include <dsn/perf_counter/perf_counter_utils.h>
#include <dsn/utility/string_view.h>
+#include <dsn/utility/synchronize.h>
#include <dsn/utils/time_utils.h>
#include <rrdb/rrdb.code.definition.h>
@@ -67,7 +68,8 @@ enum scan_data_operator
SCAN_COPY,
SCAN_CLEAR,
SCAN_COUNT,
- SCAN_GEN_GEO
+ SCAN_GEN_GEO,
+ SCAN_AND_MULTI_SET
};
class top_container
{
@@ -148,6 +150,12 @@ struct scan_data_context
bool count_hash_key;
std::string last_hash_key;
std::atomic_long split_hash_key_count;
+
+ long data_count;
+ uint32_t multi_ttl_seconds;
+ std::unordered_map<std::string, std::map<std::string, std::string>> multi_kvs;
+ dsn::utils::semaphore sema;
+
scan_data_context(scan_data_operator op_,
int split_id_,
int max_batch_count_,
@@ -156,6 +164,7 @@ struct scan_data_context
pegasus::pegasus_client *client_,
pegasus::geo::geo_client *geoclient_,
std::atomic_bool *error_occurred_,
+ int max_multi_set_concurrency = 100,
bool stat_size_ = false,
std::shared_ptr<rocksdb::Statistics> statistics_ = nullptr,
int top_count_ = 0,
@@ -179,7 +188,10 @@ struct scan_data_context
top_count(top_count_),
top_rows(top_count_),
count_hash_key(count_hash_key_),
- split_hash_key_count(0)
+ split_hash_key_count(0),
+ data_count(0),
+ multi_ttl_seconds(0),
+ sema(max_multi_set_concurrency)
{
// max_batch_count should > 1 because scan may be terminated
// when split_request_count = 1
@@ -271,6 +283,83 @@ inline int compute_ttl_seconds(uint32_t expire_ts_seconds, bool &ts_expired)
return 0;
}
+inline void batch_execute_multi_set(scan_data_context *context)
+{
+ for (const auto &kv : context->multi_kvs) {
+ // wait for satisfied with max_multi_set_concurrency
+ context->sema.wait();
+ int multi_size = kv.second.size();
+ context->client->async_multi_set(
+ kv.first,
+ kv.second,
+ [context, multi_size](int err, pegasus::pegasus_client::internal_info &&info) {
+ if (err != pegasus::PERR_OK) {
+ if (!context->split_completed.exchange(true)) {
+ fprintf(stderr,
+ "ERROR: split[%d] async_multi_set set failed: %s\n",
+ context->split_id,
+ context->client->get_error_string(err));
+ context->error_occurred->store(true);
+ }
+ } else {
+ context->split_rows += multi_size;
+ }
+ context->sema.signal();
+ },
+ context->timeout_ms,
+ context->multi_ttl_seconds);
+ }
+ context->multi_kvs.clear();
+ context->data_count = 0;
+}
+// copy data by async_multi_set
+inline void scan_multi_data_next(scan_data_context *context)
+{
+ if (!context->split_completed.load() && !context->error_occurred->load()) {
+ context->scanner->async_next([context](int ret,
+ std::string &&hash_key,
+ std::string &&sort_key,
+ std::string &&value,
+ pegasus::pegasus_client::internal_info &&info,
+ uint32_t expire_ts_seconds) {
+ if (ret == pegasus::PERR_OK) {
+ if (validate_filter(context, sort_key, value)) {
+ bool ts_expired = false;
+ int ttl_seconds = 0;
+ ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
+ if (!ts_expired) {
+ context->data_count++;
+ if (context->multi_kvs.find(hash_key) == context->multi_kvs.end()) {
+ context->multi_kvs.emplace(hash_key,
+ std::map<std::string, std::string>());
+ }
+ if (context->multi_ttl_seconds < ttl_seconds || ttl_seconds == 0) {
+ context->multi_ttl_seconds = ttl_seconds;
+ }
+ context->multi_kvs[hash_key].emplace(std::move(sort_key), std::move(value));
+
+ if (context->data_count >= context->max_batch_count) {
+ batch_execute_multi_set(context);
+ }
+ }
+ }
+ scan_multi_data_next(context);
+ } else if (ret == pegasus::PERR_SCAN_COMPLETE) {
+ batch_execute_multi_set(context);
+ context->split_completed.store(true);
+ } else {
+ if (!context->split_completed.exchange(true)) {
+ fprintf(stderr,
+ "ERROR: split[%d] scan next failed: %s\n",
+ context->split_id,
+ context->client->get_error_string(ret));
+ context->error_occurred->store(true);
+ }
+ }
+ });
+ }
+}
+
inline void scan_data_next(scan_data_context *context)
{
while (!context->split_completed.load() && !context->error_occurred->load() &&
diff --git a/src/shell/commands/data_operations.cpp b/src/shell/commands/data_operations.cpp
index 90d9703..d8cd3de 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -1538,6 +1538,7 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
{"sort_key_filter_pattern", required_argument, 0, 'y'},
{"value_filter_type", required_argument, 0, 'v'},
{"value_filter_pattern", required_argument, 0, 'z'},
+ {"max_multi_set_concurrency", required_argument, 0, 'm'},
{"no_overwrite", no_argument, 0, 'n'},
{"no_value", no_argument, 0, 'i'},
{"geo_data", no_argument, 0, 'g'},
@@ -1549,9 +1550,11 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
std::string target_geo_app_name;
int32_t partition = -1;
int max_batch_count = 500;
+ int max_multi_set_concurrency = 20;
int timeout_ms = sc->timeout_ms;
bool is_geo_data = false;
bool no_overwrite = false;
+ bool use_multi_set = false;
std::string hash_key_filter_type_name("no_filter");
std::string sort_key_filter_type_name("no_filter");
pegasus::pegasus_client::filter_type sort_key_filter_type =
@@ -1568,7 +1571,7 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
int option_index = 0;
int c;
c = getopt_long(
- args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:nige", long_options, &option_index);
+ args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:m:o:nigeu", long_options, &option_index);
if (c == -1)
break;
switch (c) {
@@ -1634,6 +1637,18 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
case 'z':
value_filter_pattern = unescape_str(optarg);
break;
+ case 'm':
+ if (!dsn::buf2int32(optarg, max_multi_set_concurrency)) {
+ fprintf(stderr, "ERROR: parse %s as max_multi_set_concurrency failed\n", optarg);
+ return false;
+ }
+ break;
+ case 'o':
+ if (!dsn::buf2int32(optarg, options.batch_size)) {
+ fprintf(stderr, "ERROR: parse %s as scan_option_batch_size failed\n", optarg);
+ return false;
+ }
+ break;
case 'n':
no_overwrite = true;
break;
@@ -1646,6 +1661,9 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
case 'e':
options.return_expire_ts = false;
break;
+ case 'u':
+ use_multi_set = true;
+ break;
default:
return false;
}
@@ -1676,6 +1694,16 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
return false;
}
+ if (max_multi_set_concurrency <= 0) {
+ fprintf(stderr, "ERROR: max_multi_set_concurrency should be greater than 0\n");
+ return false;
+ }
+
+ if (use_multi_set && no_overwrite) {
+ fprintf(stderr, "ERROR: copy with multi_set not support no_overwrite!\n");
+ return false;
+ }
+
fprintf(stderr, "INFO: source_cluster_name = %s\n", sc->pg_client->get_cluster_name());
fprintf(stderr, "INFO: source_app_name = %s\n", sc->pg_client->get_app_name());
fprintf(stderr, "INFO: target_cluster_name = %s\n", target_cluster_name.c_str());
@@ -1683,6 +1711,11 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
if (is_geo_data) {
fprintf(stderr, "INFO: target_geo_app_name = %s\n", target_geo_app_name.c_str());
}
+ if (use_multi_set) {
+ fprintf(stderr,
+ "INFO: copy use asyncer_multi_set, max_multi_set_concurrency = %d\n",
+ max_multi_set_concurrency);
+ }
fprintf(stderr,
"INFO: partition = %s\n",
partition >= 0 ? boost::lexical_cast<std::string>(partition).c_str() : "all");
@@ -1776,21 +1809,50 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
std::atomic_bool error_occurred(false);
std::vector<std::unique_ptr<scan_data_context>> contexts;
+
+ scan_data_operator op = SCAN_COPY;
+ if (is_geo_data) {
+ op = SCAN_GEN_GEO;
+ } else if (use_multi_set) {
+ fprintf(stderr,
+ "WARN: used multi_set will lose accurate ttl time per value! "
+ "ttl time will be assign the max value of this batch data.\n");
+ op = SCAN_AND_MULTI_SET;
+ // threadpool worker_count should greater than source app scanner count
+ int worker_count = dsn_config_get_value_int64("threadpool.THREAD_POOL_DEFAULT",
+ "worker_count",
+ 0,
+ "get THREAD_POOL_DEFAULT worker_count.");
+ fprintf(stderr, "INFO: THREAD_POOL_DEFAULT worker_count = %d\n", worker_count);
+ if (worker_count <= split_count) {
+ fprintf(stderr,
+ "INFO: THREAD_POOL_DEFAULT worker_count should greater than source app scanner "
+ "count %d",
+ split_count);
+ return true;
+ }
+ }
+
for (int i = 0; i < split_count; i++) {
- scan_data_context *context = new scan_data_context(is_geo_data ? SCAN_GEN_GEO : SCAN_COPY,
+ scan_data_context *context = new scan_data_context(op,
i,
max_batch_count,
timeout_ms,
scanners[i],
target_client,
target_geo_client.get(),
- &error_occurred);
+ &error_occurred,
+ max_multi_set_concurrency);
context->set_sort_key_filter(sort_key_filter_type, sort_key_filter_pattern);
context->set_value_filter(value_filter_type, value_filter_pattern);
if (no_overwrite)
context->set_no_overwrite();
contexts.emplace_back(context);
- dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
+ if (op == SCAN_AND_MULTI_SET) {
+ dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_multi_data_next, context));
+ } else {
+ dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
+ }
}
// wait thread complete
@@ -1803,8 +1865,11 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
long cur_total_rows = 0;
for (int i = 0; i < split_count; i++) {
cur_total_rows += contexts[i]->split_rows.load();
- if (contexts[i]->split_request_count.load() == 0)
+ if (op != SCAN_AND_MULTI_SET && contexts[i]->split_request_count.load() == 0) {
completed_split_count++;
+ } else if (contexts[i]->split_completed.load()) {
+ completed_split_count++;
+ }
}
if (error_occurred.load()) {
fprintf(stderr,
@@ -2370,6 +2435,7 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
sc->pg_client,
nullptr,
&error_occurred,
+ 0,
stat_size,
statistics,
top_count,
diff --git a/src/shell/config.ini b/src/shell/config.ini
index 62a649f..b4adc08 100644
--- a/src/shell/config.ini
+++ b/src/shell/config.ini
@@ -49,7 +49,7 @@ worker_priority = THREAD_xPRIORITY_NORMAL
[threadpool.THREAD_POOL_DEFAULT]
name = default
-worker_count = 8
+worker_count = 20
[threadpool.THREAD_POOL_META_SERVER]
name = meta_server
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 8be8555..308d54c 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -275,9 +275,9 @@ static command_executor commands[] = {
"[-s|--sort_key_filter_type anywhere|prefix|postfix|exact] "
"[-y|--sort_key_filter_pattern str] "
"[-v|--value_filter_type anywhere|prefix|postfix|exact] "
- "[-z|--value_filter_pattern str] "
- "[-n|--no_overwrite] [-i|--no_value] [-g|--geo_data] "
- "[-e|--no_ttl]",
+ "[-z|--value_filter_pattern str] [-m|--max_multi_set_concurrency] "
+ "[-o|--scan_option_batch_size] [-e|--no_ttl] "
+ "[-n|--no_overwrite] [-i|--no_value] [-g|--geo_data] [-u|--use_multi_set]",
data_operations,
},
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org