You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/03/28 07:05:20 UTC
[incubator-pegasus] branch duplication_dev updated: fix(dup_enhancement#22): change batch send config by using rdsn config value (#930)
This is an automated email from the ASF dual-hosted git repository.
jiashuo pushed a commit to branch duplication_dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/duplication_dev by this push:
new 29a75d6 fix(dup_enhancement#22): change batch send config by using rdsn config value (#930)
29a75d6 is described below
commit 29a75d6891f8d20f97aa6faa47830fbe3d276c3c
Author: Jiashuo <js...@live.com>
AuthorDate: Mon Mar 28 15:05:14 2022 +0800
fix(dup_enhancement#22): change batch send config by using rdsn config value (#930)
---
rdsn | 2 +-
src/server/pegasus_mutation_duplicator.cpp | 16 ++++----------
src/server/pegasus_mutation_duplicator.h | 2 +-
.../test/pegasus_mutation_duplicator_test.cpp | 25 +++++++++++++++++-----
src/shell/main.cpp | 1 +
5 files changed, 27 insertions(+), 19 deletions(-)
diff --git a/rdsn b/rdsn
index 03f7b61..2be6eba 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 03f7b613d29c611844af831b9fcb2016620b9977
+Subproject commit 2be6eba9ab4cdbcfb1f3202bff9782a0c5bdf7f7
diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp
index 6fc4ecf..c2513a9 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -23,7 +23,6 @@
#include <dsn/cpp/message_utils.h>
#include <dsn/utility/chrono_literals.h>
-#include <dsn/dist/replication/duplication_common.h>
#include <rrdb/rrdb.client.h>
namespace dsn {
@@ -42,11 +41,6 @@ namespace replication {
namespace pegasus {
namespace server {
-DSN_DEFINE_uint32("pegasus",
- duplicate_log_batch_megabytes,
- 4,
- "send mutation log batch size per rpc");
-
using namespace dsn::literals::chrono_literals;
/*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob &data)
@@ -190,13 +184,11 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
_total_shipped_size = 0;
auto batch_request = dsn::make_unique<dsn::apps::duplicate_request>();
+ uint batch_count = 0;
uint batch_bytes = 0;
- int cur_count = 0;
-
for (auto mut : muts) {
// mut: 0=timestamp, 1=rpc_code, 2=raw_message
-
- cur_count++;
+ batch_count++;
dsn::task_code rpc_code = std::get<1>(mut);
dsn::blob raw_message = std::get<2>(mut);
auto dreq = dsn::make_unique<dsn::apps::duplicate_request>();
@@ -216,8 +208,8 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
batch_bytes += raw_message.length();
}
- if (batch_bytes >= (FLAGS_duplicate_log_batch_megabytes << 20) ||
- cur_count == muts.size()) {
+ if (batch_count == muts.size() ||
+ batch_bytes >= dsn::replication::FLAGS_duplicate_log_batch_bytes) {
// since all the plog's mutations of replica belong to same gpid though the hash of
// mutation is different, use the last mutation of one batch to get and represents the
// current hash value, it will still send to remote correct replica
diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h
index daeaf34..2d04cbd 100644
--- a/src/server/pegasus_mutation_duplicator.h
+++ b/src/server/pegasus_mutation_duplicator.h
@@ -23,12 +23,12 @@
#include <dsn/dist/replication/replica_base.h>
#include <rrdb/rrdb.code.definition.h>
#include <dsn/utility/flags.h>
+#include <dsn/dist/replication/duplication_common.h>
#include "client_lib/pegasus_client_factory_impl.h"
namespace pegasus {
namespace server {
-DSN_DECLARE_uint32(duplicate_log_batch_megabytes);
using namespace dsn::literals::chrono_literals;
diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp
index f61fe69..15239db 100644
--- a/src/server/test/pegasus_mutation_duplicator_test.cpp
+++ b/src/server/test/pegasus_mutation_duplicator_test.cpp
@@ -57,7 +57,8 @@ public:
mutation_tuple_set muts;
uint total_bytes = 0;
- for (uint64_t i = 0; i < 4000; i++) {
+ uint batch_count = 0;
+ for (uint64_t i = 0; i < 400; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
@@ -69,8 +70,12 @@ public:
muts.insert(std::make_tuple(ts, code, data));
total_bytes += data.length();
+
+ if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+ batch_count++;
+ total_bytes = 0;
+ }
}
- auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1;
size_t total_shipped_size = 0;
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
@@ -121,7 +126,8 @@ public:
mutation_tuple_set muts;
uint total_bytes = 0;
- for (uint64_t i = 0; i < 4000; i++) {
+ uint batch_count = 0;
+ for (uint64_t i = 0; i < 400; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
@@ -133,8 +139,12 @@ public:
muts.insert(std::make_tuple(ts, code, data));
total_bytes += data.length();
+
+ if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+ batch_count++;
+ total_bytes = 0;
+ }
}
- auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1;
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
RPC_MOCKING(duplicate_rpc)
@@ -193,6 +203,7 @@ public:
mutation_tuple_set muts;
uint total_bytes = 0;
+ uint batch_count = 0;
for (uint64_t i = 0; i < total_size; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
@@ -205,8 +216,12 @@ public:
muts.insert(std::make_tuple(ts, code, data));
total_bytes += data.length();
+
+ if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+ batch_count++;
+ total_bytes = 0;
+ }
}
- auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1;
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
RPC_MOCKING(duplicate_rpc)
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 8be8555..974bd08 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -449,6 +449,7 @@ static command_executor commands[] = {
"[-s|--skip_prompt] [-o|--output file_name]",
ddd_diagnose,
},
+ // todo(jiashuo1) [-f|--freezed] is Deprecated, it will be removed later
{"add_dup", "add duplication", "<app_name> <remote_cluster_name> [-f|--freezed]", add_dup},
{"query_dup", "query duplication info", "<app_name> [-d|--detail]", query_dup},
{"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup},
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org