You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/03/28 07:05:20 UTC

[incubator-pegasus] branch duplication_dev updated: fix(dup_enhancement#22): change batch send config by using rdsn config value (#930)

This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch duplication_dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/duplication_dev by this push:
     new 29a75d6  fix(dup_enhancement#22): change batch send config by using rdsn config value (#930)
29a75d6 is described below

commit 29a75d6891f8d20f97aa6faa47830fbe3d276c3c
Author: Jiashuo <js...@live.com>
AuthorDate: Mon Mar 28 15:05:14 2022 +0800

    fix(dup_enhancement#22): change batch send config by using rdsn config value (#930)
---
 rdsn                                               |  2 +-
 src/server/pegasus_mutation_duplicator.cpp         | 16 ++++----------
 src/server/pegasus_mutation_duplicator.h           |  2 +-
 .../test/pegasus_mutation_duplicator_test.cpp      | 25 +++++++++++++++++-----
 src/shell/main.cpp                                 |  1 +
 5 files changed, 27 insertions(+), 19 deletions(-)

diff --git a/rdsn b/rdsn
index 03f7b61..2be6eba 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 03f7b613d29c611844af831b9fcb2016620b9977
+Subproject commit 2be6eba9ab4cdbcfb1f3202bff9782a0c5bdf7f7
diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp
index 6fc4ecf..c2513a9 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -23,7 +23,6 @@
 
 #include <dsn/cpp/message_utils.h>
 #include <dsn/utility/chrono_literals.h>
-#include <dsn/dist/replication/duplication_common.h>
 #include <rrdb/rrdb.client.h>
 
 namespace dsn {
@@ -42,11 +41,6 @@ namespace replication {
 namespace pegasus {
 namespace server {
 
-DSN_DEFINE_uint32("pegasus",
-                  duplicate_log_batch_megabytes,
-                  4,
-                  "send mutation log batch size per rpc");
-
 using namespace dsn::literals::chrono_literals;
 
 /*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob &data)
@@ -190,13 +184,11 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
     _total_shipped_size = 0;
 
     auto batch_request = dsn::make_unique<dsn::apps::duplicate_request>();
+    uint batch_count = 0;
     uint batch_bytes = 0;
-    int cur_count = 0;
-
     for (auto mut : muts) {
         // mut: 0=timestamp, 1=rpc_code, 2=raw_message
-
-        cur_count++;
+        batch_count++;
         dsn::task_code rpc_code = std::get<1>(mut);
         dsn::blob raw_message = std::get<2>(mut);
         auto dreq = dsn::make_unique<dsn::apps::duplicate_request>();
@@ -216,8 +208,8 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
             batch_bytes += raw_message.length();
         }
 
-        if (batch_bytes >= (FLAGS_duplicate_log_batch_megabytes << 20) ||
-            cur_count == muts.size()) {
+        if (batch_count == muts.size() ||
+            batch_bytes >= dsn::replication::FLAGS_duplicate_log_batch_bytes) {
             // since all the plog's mutations of replica belong to same gpid though the hash of
             // mutation is different, use the last mutation of one batch to get and represents the
             // current hash value, it will still send to remote correct replica
diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h
index daeaf34..2d04cbd 100644
--- a/src/server/pegasus_mutation_duplicator.h
+++ b/src/server/pegasus_mutation_duplicator.h
@@ -23,12 +23,12 @@
 #include <dsn/dist/replication/replica_base.h>
 #include <rrdb/rrdb.code.definition.h>
 #include <dsn/utility/flags.h>
+#include <dsn/dist/replication/duplication_common.h>
 
 #include "client_lib/pegasus_client_factory_impl.h"
 
 namespace pegasus {
 namespace server {
-DSN_DECLARE_uint32(duplicate_log_batch_megabytes);
 
 using namespace dsn::literals::chrono_literals;
 
diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp
index f61fe69..15239db 100644
--- a/src/server/test/pegasus_mutation_duplicator_test.cpp
+++ b/src/server/test/pegasus_mutation_duplicator_test.cpp
@@ -57,7 +57,8 @@ public:
 
         mutation_tuple_set muts;
         uint total_bytes = 0;
-        for (uint64_t i = 0; i < 4000; i++) {
+        uint batch_count = 0;
+        for (uint64_t i = 0; i < 400; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
 
@@ -69,8 +70,12 @@ public:
 
             muts.insert(std::make_tuple(ts, code, data));
             total_bytes += data.length();
+
+            if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+                batch_count++;
+                total_bytes = 0;
+            }
         }
-        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1;
 
         size_t total_shipped_size = 0;
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
@@ -121,7 +126,8 @@ public:
 
         mutation_tuple_set muts;
         uint total_bytes = 0;
-        for (uint64_t i = 0; i < 4000; i++) {
+        uint batch_count = 0;
+        for (uint64_t i = 0; i < 400; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
 
@@ -133,8 +139,12 @@ public:
 
             muts.insert(std::make_tuple(ts, code, data));
             total_bytes += data.length();
+
+            if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+                batch_count++;
+                total_bytes = 0;
+            }
         }
-        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1;
 
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
         RPC_MOCKING(duplicate_rpc)
@@ -193,6 +203,7 @@ public:
 
         mutation_tuple_set muts;
         uint total_bytes = 0;
+        uint batch_count = 0;
         for (uint64_t i = 0; i < total_size; i++) {
             uint64_t ts = 200 + i;
             dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
@@ -205,8 +216,12 @@ public:
 
             muts.insert(std::make_tuple(ts, code, data));
             total_bytes += data.length();
+
+            if (total_bytes >= FLAGS_duplicate_log_batch_bytes) {
+                batch_count++;
+                total_bytes = 0;
+            }
         }
-        auto batch_count = total_bytes / (FLAGS_duplicate_log_batch_megabytes << 20) + 1;
 
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
         RPC_MOCKING(duplicate_rpc)
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 8be8555..974bd08 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -449,6 +449,7 @@ static command_executor commands[] = {
         "[-s|--skip_prompt] [-o|--output file_name]",
         ddd_diagnose,
     },
+    // todo(jiashuo1) [-f|--freezed] is Deprecated, it will be removed later
     {"add_dup", "add duplication", "<app_name> <remote_cluster_name> [-f|--freezed]", add_dup},
     {"query_dup", "query duplication info", "<app_name> [-d|--detail]", query_dup},
     {"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup},

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org