You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/07/08 06:37:42 UTC

[GitHub] [doris] morningman commented on a diff in pull request #10298: [Load][Enhancement] Support single replica load

morningman commented on code in PR #10298:
URL: https://github.com/apache/doris/pull/10298#discussion_r913369254


##########
gensrc/proto/internal_service.proto:
##########
@@ -469,6 +492,32 @@ message PResetRPCChannelResponse {
 
 message PEmptyRequest {};
 
+message PTabletWriteSlaveRequest {
+    required RowsetMetaPB rowset_meta = 1;

Review Comment:
   use optional for all fields.
   Same to other newly added structs.



##########
be/src/exec/tablet_sink.h:
##########
@@ -287,6 +291,7 @@ class NodeChannel {
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
 
     std::vector<TTabletWithPartition> _all_tablets;
+    std::unordered_map<int64_t, std::vector<int64_t>> _slave_tablet_nodes;

Review Comment:
   Add comment to explain these `int64_t`



##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -722,6 +722,24 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static boolean disable_show_stream_load = false;
 
+    /**
+     * Whether to enable to write single replica for stream load.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_single_replica_stream_load = false;
+
+    /**
+     * Whether to enable to write single replica for broker load.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_single_replica_broker_load = false;
+
+    /**
+     * Whether to enable to write single replica for insert.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_single_replica_insert = false;

Review Comment:
   better to use a load property for stream and broker load.
   and use session variable to control the insert operation.



##########
be/src/common/config.h:
##########
@@ -35,6 +35,11 @@ CONF_Int32(brpc_port, "8060");
 // the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores
 CONF_Int32(brpc_num_threads, "-1");
 
+// port to brpc server for single replica load
+CONF_Int32(single_replica_load_brpc_port, "8070");

Review Comment:
   The notification rpc is a light weight rpc. Do we really need to create a new brpc port for this?



##########
be/src/runtime/tablets_channel.cpp:
##########
@@ -102,7 +111,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
         _state = kFinished;
         // All senders are closed
         // 1. close all delta writers
-        std::vector<DeltaWriter*> need_wait_writers;
+        std::set<DeltaWriter*> need_wait_writers;

Review Comment:
   Why change to std::set?



##########
be/src/olap/delta_writer.h:
##########
@@ -67,7 +67,13 @@ class DeltaWriter {
     Status close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    Status close_wait();
+    Status close_wait(PSlaveTabletNodes slave_tablet_nodes, const bool write_single_replica);

Review Comment:
   ```suggestion
       Status close_wait(const PSlaveTabletNodes& slave_tablet_nodes, bool write_single_replica);
   ```



##########
be/src/exec/tablet_sink.h:
##########
@@ -174,6 +174,10 @@ class NodeChannel {
 
     virtual Status init(RuntimeState* state);
 
+    void add_slave_tablet_nodes(int64_t tablet_id, std::vector<int64_t> slave_nodes) {

Review Comment:
   ```suggestion
       void add_slave_tablet_nodes(int64_t tablet_id, const std::vector<int64_t>& slave_nodes) {
   ```



##########
be/src/runtime/tablets_channel.cpp:
##########
@@ -124,20 +133,46 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
             }
         }
 
+        _write_single_replica = write_single_replica;
+
         // 2. wait delta writers and build the tablet vector
         for (auto writer : need_wait_writers) {
+            PSlaveTabletNodes slave_nodes;
+            if (write_single_replica) {
+                slave_nodes = slave_tablet_nodes.at(writer->tablet_id());
+            }
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE judge it.
-            _close_wait(writer, tablet_vec, tablet_errors);
+            _close_wait(writer, tablet_vec, tablet_errors, slave_nodes, write_single_replica);
+        }
+
+        if (write_single_replica) {
+            CountDownLatch latch(1);
+            while (need_wait_writers.size() > 0 &&
+                   (time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) {
+                for (auto writer : need_wait_writers) {
+                    bool is_done = writer->check_slave_replicas_done(success_slave_tablet_node_ids);
+                    if (is_done) {
+                        need_wait_writers.erase(writer);

Review Comment:
   Is it safe to erase a map's element when it is being iterated?



##########
be/src/runtime/tablets_channel.cpp:
##########
@@ -124,20 +133,46 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
             }
         }
 
+        _write_single_replica = write_single_replica;
+
         // 2. wait delta writers and build the tablet vector
         for (auto writer : need_wait_writers) {
+            PSlaveTabletNodes slave_nodes;
+            if (write_single_replica) {
+                slave_nodes = slave_tablet_nodes.at(writer->tablet_id());
+            }
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE judge it.
-            _close_wait(writer, tablet_vec, tablet_errors);
+            _close_wait(writer, tablet_vec, tablet_errors, slave_nodes, write_single_replica);
+        }
+
+        if (write_single_replica) {
+            CountDownLatch latch(1);
+            while (need_wait_writers.size() > 0 &&
+                   (time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) {

Review Comment:
   Add comment to explain why it multiply 0.9.



##########
be/src/runtime/tablets_channel.cpp:
##########
@@ -124,20 +133,46 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
             }
         }
 
+        _write_single_replica = write_single_replica;
+
         // 2. wait delta writers and build the tablet vector
         for (auto writer : need_wait_writers) {
+            PSlaveTabletNodes slave_nodes;
+            if (write_single_replica) {
+                slave_nodes = slave_tablet_nodes.at(writer->tablet_id());
+            }
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE judge it.
-            _close_wait(writer, tablet_vec, tablet_errors);
+            _close_wait(writer, tablet_vec, tablet_errors, slave_nodes, write_single_replica);
+        }
+
+        if (write_single_replica) {
+            CountDownLatch latch(1);
+            while (need_wait_writers.size() > 0 &&
+                   (time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) {
+                for (auto writer : need_wait_writers) {
+                    bool is_done = writer->check_slave_replicas_done(success_slave_tablet_node_ids);
+                    if (is_done) {
+                        need_wait_writers.erase(writer);
+                    }
+                }
+                latch.wait_for(std::chrono::milliseconds(100));

Review Comment:
   Why using a CountDownLatch here?
   I think we can just call `sleep()`?



##########
be/src/service/internal_service.h:
##########
@@ -175,9 +183,13 @@ class PInternalServiceImpl : public PBackendService {
                                   PTabletWriterAddBlockResult* response,
                                   google::protobuf::Closure* done);
 
+    void _response_pull_slave_rowset(std::string remote_host, int64_t brpc_port, int64_t txn_id,

Review Comment:
   ```suggestion
       void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port, int64_t txn_id,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org