You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/05/06 13:02:53 UTC

[GitHub] [incubator-doris] dataroaring commented on a diff in pull request #9414: [Draft] Improve tablet sink

dataroaring commented on code in PR #9414:
URL: https://github.com/apache/incubator-doris/pull/9414#discussion_r866797858


##########
be/src/exec/tablet_sink.cpp:
##########
@@ -397,9 +355,8 @@ Status NodeChannel::close_wait(RuntimeState* state) {
     }
 
     // waiting for finished, it may take a long time, so we couldn't set a timeout
-    while (!_add_batches_finished && !_cancelled) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    }
+    brpc::StreamWait(_stream_id, nullptr);

Review Comment:
   we need to handle failure of StreamWait.



##########
be/src/exec/tablet_sink.cpp:
##########
@@ -397,9 +355,8 @@ Status NodeChannel::close_wait(RuntimeState* state) {
     }
 
     // waiting for finished, it may take a long time, so we couldn't set a timeout
-    while (!_add_batches_finished && !_cancelled) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    }
+    brpc::StreamWait(_stream_id, nullptr);
+    _add_batches_finished = true;

Review Comment:
   we should set this flag at try_send_batch.



##########
be/src/exec/tablet_sink.cpp:
##########
@@ -465,103 +430,81 @@ int NodeChannel::try_send_and_fetch_status(RuntimeState* state,
         return 0;
     }
 
-    if (!_add_batch_closure->try_set_in_flight()) {
-        return _send_finished ? 0 : 1;
-    }
-
     // We are sure that try_send_batch is not running
     if (_pending_batches_num > 0) {
-        auto s = thread_pool_token->submit_func(
-                std::bind(&NodeChannel::try_send_batch, this, state));
-        if (!s.ok()) {
-            _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
-            // clear in flight
-            _add_batch_closure->clear_in_flight();
-        }
-        // in_flight is cleared in closure::Run
-    } else {
-        // clear in flight
-        _add_batch_closure->clear_in_flight();
+        try_send_batch(state);
     }
-    return _send_finished ? 0 : 1;
+    return (_send_finished && _stream_msg.size() == 0) ? 0 : 1;
 }
 
 void NodeChannel::try_send_batch(RuntimeState* state) {
     SCOPED_ATTACH_TASK_THREAD(state, _node_channel_tracker);
     SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
-    AddBatchReq send_batch;
-    {
-        debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
-        std::lock_guard<std::mutex> l(_pending_batches_lock);
-        DCHECK(!_pending_batches.empty());
-        send_batch = std::move(_pending_batches.front());
-        _pending_batches.pop();
-        _pending_batches_num--;
-        _pending_batches_bytes -= send_batch.first->tuple_data_pool()->total_reserved_bytes();
-    }
+    if (_stream_msg.size() == 0) {
+        AddBatchReq send_batch;
+        {
+            debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+            std::lock_guard<std::mutex> l(_pending_batches_lock);
+            DCHECK(!_pending_batches.empty());
+            send_batch = std::move(_pending_batches.front());
+            _pending_batches.pop();
+            _pending_batches_num--;
+            _pending_batches_bytes -= send_batch.first->tuple_data_pool()->total_reserved_bytes();
+        }
 
-    auto row_batch = std::move(send_batch.first);
-    auto request = std::move(send_batch.second); // doesn't need to be saved in heap
+        auto row_batch = std::move(send_batch.first);
+        auto request = std::move(send_batch.second); // doesn't need to be saved in heap
 
-    // tablet_ids has already set when add row
-    request.set_packet_seq(_next_packet_seq);
-    if (row_batch->num_rows() > 0) {
-        SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
-        size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        Status st = row_batch->serialize(request.mutable_row_batch(), &uncompressed_bytes,
-                                         &compressed_bytes, _tuple_data_buffer_ptr);
-        if (!st.ok()) {
-            cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
-            _add_batch_closure->clear_in_flight();
-            return;
-        }
-        if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
-            LOG(WARNING) << "send batch too large, this rpc may failed. send size: "
-                         << compressed_bytes << ", threshold: " << config::brpc_max_body_size
-                         << ", " << channel_info();
+        // tablet_ids has already set when add row
+        request.set_packet_seq(_next_packet_seq);
+        if (row_batch->num_rows() > 0) {
+            SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
+            size_t uncompressed_bytes = 0, compressed_bytes = 0;
+            Status st = row_batch->serialize(request.mutable_row_batch(), &uncompressed_bytes,
+                                             &compressed_bytes, _tuple_data_buffer_ptr);
+            if (!st.ok()) {
+                cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
+                return;
+            }
+            if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
+                LOG(WARNING) << "send batch too large, this rpc may failed. send size: "
+                             << compressed_bytes << ", threshold: " << config::brpc_max_body_size
+                             << ", " << channel_info();
+            }
         }
-    }
 
-    int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
-    if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
-        if (remain_ms <= 0 && !request.eos()) {
-            cancel(fmt::format("{}, err: timeout", channel_info()));
-            _add_batch_closure->clear_in_flight();
-            return;
-        } else {
-            remain_ms = config::min_load_rpc_timeout_ms;
+        int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
+        if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
+            if (remain_ms <= 0 && !request.eos()) {
+                cancel(fmt::format("{}, err: timeout", channel_info()));
+                return;
+            } else {
+                remain_ms = config::min_load_rpc_timeout_ms;
+            }
         }
-    }
 
-    // After calling reset(), make sure that the rpc will be called finally.
-    // Otherwise, when calling _add_batch_closure->join(), it will be blocked forever.
-    // and _add_batch_closure->join() will be called in ~NodeChannel().
-    _add_batch_closure->reset();
-    _add_batch_closure->cntl.set_timeout_ms(remain_ms);
-    if (config::tablet_writer_ignore_eovercrowded) {
-        _add_batch_closure->cntl.ignore_eovercrowded();
-    }
+        if (request.eos()) {
+            for (auto pid : _parent->_partition_ids) {
+                request.add_partition_ids(pid);
+            }
 
-    if (request.eos()) {
-        for (auto pid : _parent->_partition_ids) {
-            request.add_partition_ids(pid);
+            // eos request must be the last request
+            _send_finished = true;
+            CHECK(_pending_batches_num == 0) << _pending_batches_num;
         }
-
-        // eos request must be the last request
-        _add_batch_closure->end_mark();
-        _send_finished = true;
-        CHECK(_pending_batches_num == 0) << _pending_batches_num;
+        butil::IOBufAsZeroCopyOutputStream wrapper(&_stream_msg);
+        request.SerializeToZeroCopyStream(&wrapper);
     }
-
-    if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) {
-        request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
-                                              ReusableClosure<PTabletWriterAddBatchResult>>(
-                &request, _tuple_data_buffer, _add_batch_closure);
+    int res = brpc::StreamWrite(_stream_id, _stream_msg);
+    if (res == 0) {
+        _next_packet_seq++;
+        _stream_msg.clear();

Review Comment:
   _add_batches_finished = request.eos(); because we push last message to stream, so mark the finished flag.



##########
be/src/exec/tablet_sink.cpp:
##########
@@ -465,103 +430,81 @@ int NodeChannel::try_send_and_fetch_status(RuntimeState* state,
         return 0;
     }
 
-    if (!_add_batch_closure->try_set_in_flight()) {
-        return _send_finished ? 0 : 1;
-    }
-
     // We are sure that try_send_batch is not running
     if (_pending_batches_num > 0) {
-        auto s = thread_pool_token->submit_func(
-                std::bind(&NodeChannel::try_send_batch, this, state));
-        if (!s.ok()) {
-            _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
-            // clear in flight
-            _add_batch_closure->clear_in_flight();
-        }
-        // in_flight is cleared in closure::Run
-    } else {
-        // clear in flight
-        _add_batch_closure->clear_in_flight();
+        try_send_batch(state);

Review Comment:
   if (_pending_batches_num > 0 || _stream_msg.size() > 0)



##########
be/src/service/internal_service.cpp:
##########
@@ -130,15 +172,23 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
             brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-            attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl);
-            auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
+            auto st = Status::OK();
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
                              << ", id=" << request->id() << ", index_id=" << request->index_id()
                              << ", sender_id=" << request->sender_id()
                              << ", backend id=" << request->backend_id();
             }
             st.to_protobuf(response->mutable_status());
+            brpc::StreamOptions stream_options;
+            stream_options.handler = _tablet_add_batch_receiver.get();
+            brpc::StreamId stream_id;
+            if (brpc::StreamAccept(&stream_id, *cntl, &stream_options) != 0) {
+                cntl->SetFailed("Fail to accept stream");
+                return;
+            }
+            _stream_ids.push_back(stream_id);
+            LOG(INFO) << "Accept a stream";

Review Comment:
   We should not change this rpc and build stream in the open rpc with a config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org