You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "yiguolei (via GitHub)" <gi...@apache.org> on 2023/06/16 06:57:57 UTC

[GitHub] [doris] yiguolei commented on a diff in pull request #20771: [fix](pipeline) refactor olap table sink close

yiguolei commented on code in PR #20771:
URL: https://github.com/apache/doris/pull/20771#discussion_r1231856957


##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1393,133 +1385,226 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
     return Status::OK();
 }
 
-Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
-        return _close_status;
+Status VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
+        Status status, const std::string& err_msg, const std::shared_ptr<IndexChannel> ich,
+        const std::shared_ptr<VNodeChannel> nch) {
+    LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << err_msg;
+    ich->mark_as_failed(nch->node_id(), nch->host(), err_msg, -1);
+    // cancel the node channel in best effort
+    nch->cancel(err_msg);
+
+    // check if index has intolerable failure
+    Status index_st = ich->check_intolerable_failure();
+    if (!index_st.ok()) {
+        status = index_st;
+    } else if (Status st = ich->check_tablet_received_rows_consistency(); !st.ok()) {
+        status = st;
+    }
+    return status;
+}
+
+void VOlapTableSink::_cancel_all_channel(Status status, const std::string& err_msg) {
+    for (auto channel : _channels) {
+        channel->for_each_node_channel([&status](const std::shared_ptr<VNodeChannel>& ch) {
+            ch->cancel(status.to_string());
+        });
+    }
+    LOG(INFO) << fmt::format(
+            "{}, close olap table sink. load_id={}, txn_id={}, canceled all node channels due to "
+            "error: {}",
+            err_msg, print_id(_load_id), _txn_id, status);
+}
+
+bool VOlapTableSink::is_pending_finish() {
+    if (_pending_finish) {
+        bool pending_finish = false;
+        for (auto index_channel : _channels) {
+            index_channel->for_each_node_channel(
+                    [&pending_finish](const std::shared_ptr<VNodeChannel>& ch) {
+                        pending_finish |= ch->is_pending_finish();
+                    });
+        }
+        _pending_finish = pending_finish;
+    }
+    return _pending_finish;
+}
+
+void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
+    if (!_pending_finish) {
+        return;
     }
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
     if (status.ok()) {
         // only if status is ok can we call this _profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that _profile is null
         SCOPED_TIMER(_profile->total_time_counter());
-        // BE id -> add_batch method counter
-        std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
-        int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, actual_consume_ns = 0,
-                total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns = 0,
-                total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, total_add_batch_num = 0,
-                num_node_channels = 0;
-        VNodeChannelStat channel_stat;
         {
-            if (config::enable_lazy_open_partition) {
-                for (auto index_channel : _channels) {
-                    index_channel->for_each_node_channel(
-                            [](const std::shared_ptr<VNodeChannel>& ch) {
-                                ch->open_partition_wait();
-                            });
-                }
-            }
-
-            for (auto index_channel : _channels) {
-                index_channel->for_each_node_channel(
-                        [](const std::shared_ptr<VNodeChannel>& ch) { ch->mark_close(); });
-                num_node_channels += index_channel->num_node_channels();
-            }
-
             for (auto index_channel : _channels) {
-                int64_t add_batch_exec_time = 0;
-                int64_t wait_exec_time = 0;
+                if (!status.ok()) {
+                    break;
+                }
                 index_channel->for_each_node_channel(
-                        [&index_channel, &state, &node_add_batch_counter_map, &serialize_batch_ns,
-                         &channel_stat, &queue_push_lock_ns, &actual_consume_ns,
-                         &total_add_batch_exec_time_ns, &add_batch_exec_time,
-                         &total_wait_exec_time_ns, &wait_exec_time,
-                         &total_add_batch_num](const std::shared_ptr<VNodeChannel>& ch) {
-                            auto s = ch->close_wait(state);
-                            if (!s.ok()) {
-                                auto err_msg = s.to_string();
-                                index_channel->mark_as_failed(ch->node_id(), ch->host(), err_msg,
-                                                              -1);
-                                // cancel the node channel in best effort
-                                ch->cancel(err_msg);
-                                LOG(WARNING) << ch->channel_info()
-                                             << ", close channel failed, err: " << err_msg;
+                        [this, &index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            // first try close, all node channels will mark_close()
+                            // second and after try close, only check node channel is cancelled,
+                            // such as node channel has rpc error.
+                            if (this->_try_closed) {
+                                if (ch->is_cancelled()) {
+                                    status = this->_cancel_channel_and_check_intolerable_failure(
+                                            status, ch->get_cancel_msg(), index_channel, ch);
+                                }
+                            } else {
+                                auto s = ch->mark_close();
+                                if (!s.ok()) {
+                                    status = this->_cancel_channel_and_check_intolerable_failure(
+                                            status, s.to_string(), index_channel, ch);
+                                }
                             }
-                            ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns,
-                                            &channel_stat, &queue_push_lock_ns, &actual_consume_ns,
-                                            &total_add_batch_exec_time_ns, &add_batch_exec_time,
-                                            &total_wait_exec_time_ns, &wait_exec_time,
-                                            &total_add_batch_num);
                         });
-
-                if (add_batch_exec_time > max_add_batch_exec_time_ns) {
-                    max_add_batch_exec_time_ns = add_batch_exec_time;
-                }
-                if (wait_exec_time > max_wait_exec_time_ns) {
-                    max_wait_exec_time_ns = wait_exec_time;
-                }
-
-                // check if index has intolerable failure
-                Status index_st = index_channel->check_intolerable_failure();
-                if (!index_st.ok()) {
-                    status = index_st;
-                } else if (Status st = index_channel->check_tablet_received_rows_consistency();
-                           !st.ok()) {
-                    status = st;
-                }
             } // end for index channels
         }
-        // TODO need to be improved
-        LOG(INFO) << "total mem_exceeded_block_ns=" << channel_stat.mem_exceeded_block_ns
-                  << ", total queue_push_lock_ns=" << queue_push_lock_ns
-                  << ", total actual_consume_ns=" << actual_consume_ns
-                  << ", load id=" << print_id(_load_id);
-
-        COUNTER_SET(_input_rows_counter, _number_input_rows);
-        COUNTER_SET(_output_rows_counter, _number_output_rows);
-        COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
-        COUNTER_SET(_send_data_timer, _send_data_ns);
-        COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
-        COUNTER_SET(_filter_timer, _filter_ns);
-        COUNTER_SET(_append_node_channel_timer, channel_stat.append_node_channel_ns);
-        COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
-        COUNTER_SET(_wait_mem_limit_timer, channel_stat.mem_exceeded_block_ns);
-        COUNTER_SET(_validate_data_timer, _validate_data_ns);
-        COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
-        COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
-        COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns);
-        COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
-        COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns);
-        COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
-        COUNTER_SET(_add_batch_number, total_add_batch_num);
-        COUNTER_SET(_num_node_channels, num_node_channels);
-        // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
-        int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() +
-                                      state->num_rows_load_unselected();
-        state->set_num_rows_load_total(num_rows_load_total);
-        state->update_num_rows_load_filtered(_number_filtered_rows);
-        state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
-
-        // print log of add batch time of all node, for tracing load performance easily
-        std::stringstream ss;
-        ss << "finished to close olap table sink. load_id=" << print_id(_load_id)
-           << ", txn_id=" << _txn_id
-           << ", node add batch time(ms)/wait execution time(ms)/close time(ms)/num: ";
-        for (auto const& pair : node_add_batch_counter_map) {
-            ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000)
-               << ")(" << (pair.second.add_batch_wait_execution_time_us / 1000) << ")("
-               << pair.second.close_wait_time_ms << ")(" << pair.second.add_batch_num << ")} ";
+    }
+
+    if (status.ok()) {
+        if (!_try_closed) {
+            LOG(INFO) << "try close olap table sink. load_id=" << print_id(_load_id)
+                      << ", txn_id=" << _txn_id;
         }
-        LOG(INFO) << ss.str();
     } else {
-        for (auto channel : _channels) {
-            channel->for_each_node_channel([&status](const std::shared_ptr<VNodeChannel>& ch) {
-                ch->cancel(status.to_string());
-            });
+        _cancel_all_channel(status, "in try close");
+        _pending_finish = false;
+        _close_status = status;
+    }
+    _try_closed = true;
+}
+
+Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
+    if (_closed) {
+        return _close_status;
+    }
+    try_close(state, exec_status);
+    SCOPED_TIMER(_close_timer);
+    vectorized::VExpr::close(_output_vexpr_ctxs, state);

Review Comment:
   This line is useless now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org