You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/01/19 07:23:35 UTC

(doris) branch master updated: [refactor](close) refactor ispendingfinish logic and close logic to do close more quickly (#30021)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c2fb951f6b [refactor](close) refactor ispendingfinish logic and close logic to do close more quickly (#30021)
4c2fb951f6b is described below

commit 4c2fb951f6bb4af49fe611f9ac87a019b75c3f5c
Author: yiguolei <67...@qq.com>
AuthorDate: Fri Jan 19 15:23:27 2024 +0800

    [refactor](close) refactor ispendingfinish logic and close logic to do close more quickly (#30021)
---
 be/src/exec/data_sink.h                            |  2 +-
 be/src/pipeline/exec/jdbc_table_sink_operator.h    |  1 +
 be/src/pipeline/exec/olap_table_sink_operator.h    |  1 +
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |  1 +
 be/src/pipeline/exec/operator.h                    |  2 +-
 .../pipeline/exec/partition_sort_sink_operator.cpp |  2 --
 be/src/pipeline/exec/result_file_sink_operator.cpp |  6 ++---
 be/src/pipeline/exec/result_file_sink_operator.h   |  1 +
 be/src/pipeline/exec/sort_sink_operator.cpp        |  1 -
 be/src/pipeline/exec/table_function_operator.cpp   |  1 -
 be/src/pipeline/pipeline_x/operator.cpp            | 14 +++++------
 be/src/pipeline/pipeline_x/operator.h              |  5 ++++
 be/src/runtime/result_writer.h                     |  2 ++
 be/src/runtime/runtime_state.cpp                   | 18 --------------
 be/src/runtime/runtime_state.h                     |  5 ----
 be/src/vec/exec/vpartition_sort_node.cpp           |  3 ---
 be/src/vec/exec/vsort_node.cpp                     |  2 --
 be/src/vec/exec/vtable_function_node.cpp           |  1 -
 be/src/vec/sink/async_writer_sink.h                | 23 +++++++++++-------
 be/src/vec/sink/vresult_file_sink.cpp              | 19 ++++++++++-----
 be/src/vec/sink/writer/async_result_writer.cpp     | 23 +++++++++++-------
 be/src/vec/sink/writer/async_result_writer.h       |  7 ------
 be/src/vec/sink/writer/vjdbc_table_writer.h        |  6 ++---
 be/src/vec/sink/writer/vodbc_table_writer.h        |  6 ++---
 be/src/vec/sink/writer/vtablet_writer.cpp          | 28 ++++------------------
 be/src/vec/sink/writer/vtablet_writer.h            |  8 ++-----
 26 files changed, 75 insertions(+), 113 deletions(-)

diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 3bf72ae5450..be6cfe236b7 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -74,7 +74,7 @@ public:
         return Status::OK();
     }
 
-    virtual bool is_close_done() { return true; }
+    [[nodiscard]] virtual bool is_pending_finish() const { return false; }
 
     // Releases all resources that were allocated in prepare()/send().
     // Further send() calls are illegal after calling close().
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h b/be/src/pipeline/exec/jdbc_table_sink_operator.h
index 41348ccaccf..33018f69da5 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.h
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -58,6 +58,7 @@ public:
 private:
     friend class JdbcTableSinkLocalState;
     template <typename Writer, typename Parent>
+        requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
     friend class AsyncWriterSink;
 
     const RowDescriptor& _row_desc;
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h
index 6aa80776a3a..6707ddd86e6 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -102,6 +102,7 @@ public:
 private:
     friend class OlapTableSinkLocalState;
     template <typename Writer, typename Parent>
+        requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
     friend class AsyncWriterSink;
     const RowDescriptor& _row_desc;
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index 1e68f9e98d3..f1b5d6e9061 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -104,6 +104,7 @@ public:
 private:
     friend class OlapTableSinkV2LocalState;
     template <typename Writer, typename Parent>
+        requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
     friend class AsyncWriterSink;
     const RowDescriptor& _row_desc;
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 80184374b77..cd5fba5fee3 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -297,7 +297,7 @@ public:
         return _sink->try_close(state, state->query_status());
     }
 
-    [[nodiscard]] bool is_pending_finish() const override { return !_sink->is_close_done(); }
+    [[nodiscard]] bool is_pending_finish() const override { return _sink->is_pending_finish(); }
 
     Status close(RuntimeState* state) override {
         if (is_closed()) {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 14d917d62e3..c09a6a90b95 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -120,8 +120,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
                 RETURN_IF_ERROR(
                         _split_block_by_partition(input_block, state->batch_size(), local_state));
                 RETURN_IF_CANCELLED(state);
-                RETURN_IF_ERROR(
-                        state->check_query_state("VPartitionSortNode, while split input block."));
                 input_block->clear_column_data();
             }
         }
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 2b095748b15..71a2d539c95 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -173,9 +173,9 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
     }
 
     Status final_status = exec_status;
-    // close the writer
-    if (_writer && _writer->need_normal_close()) {
-        Status st = _writer->close();
+    // For pipelinex engine, the writer is closed in async thread process_block
+    if (_writer) {
+        Status st = _writer->get_writer_status();
         if (!st.ok() && exec_status.ok()) {
             // close file writer failed, should return this error to client
             final_status = st;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h
index 57e1e8c9147..e196401991a 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -106,6 +106,7 @@ public:
 private:
     friend class ResultFileSinkLocalState;
     template <typename Writer, typename Parent>
+        requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
     friend class AsyncWriterSink;
 
     const RowDescriptor& _row_desc;
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp
index e2c851f758f..56a81422484 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -150,7 +150,6 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in
     if (in_block->rows() > 0) {
         RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
         RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input."));
 
         // update runtime predicate
         if (_use_topn_opt) {
diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp
index cf14b3ec5e3..960fccff2b4 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -159,7 +159,6 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state,
 
     while (columns[p._child_slots.size()]->size() < state->batch_size()) {
         RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch."));
 
         if (_child_block->rows() == 0) {
             break;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp
index e38e7b39d95..bbb7473f868 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -497,6 +497,7 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
 }
 
 template <typename Writer, typename Parent>
+    requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     _output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
@@ -516,6 +517,7 @@ Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkState
 }
 
 template <typename Writer, typename Parent>
+    requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
     RETURN_IF_ERROR(Base::open(state));
     _writer->start_writer(state, _profile);
@@ -523,12 +525,14 @@ Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
 }
 
 template <typename Writer, typename Parent>
+    requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, vectorized::Block* block,
                                              SourceState source_state) {
     return _writer->sink(block, source_state == SourceState::FINISHED);
 }
 
 template <typename Writer, typename Parent>
+    requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
         return Status::OK();
@@ -537,19 +541,13 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s
     COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
     // if the init failed, the _writer may be nullptr. so here need check
     if (_writer) {
-        if (_writer->need_normal_close()) {
-            if (exec_status.ok() && !state->is_cancelled()) {
-                RETURN_IF_ERROR(_writer->commit_trans());
-            }
-            RETURN_IF_ERROR(_writer->close(exec_status));
-        } else {
-            RETURN_IF_ERROR(_writer->get_writer_status());
-        }
+        RETURN_IF_ERROR(_writer->get_writer_status());
     }
     return Base::close(state, exec_status);
 }
 
 template <typename Writer, typename Parent>
+    requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 Status AsyncWriterSink<Writer, Parent>::try_close(RuntimeState* state, Status exec_status) {
     if (state->is_cancelled() || !exec_status.ok()) {
         _writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled"));
diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h
index d46dc859b0c..ca1b224c5c5 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -22,6 +22,9 @@
 #include "pipeline/pipeline_x/dependency.h"
 #include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
 
+namespace doris::vectorized {
+class AsyncResultWriter;
+}
 namespace doris::pipeline {
 
 struct LocalExchangeSinkDependency;
@@ -569,6 +572,7 @@ public:
 
 protected:
     template <typename Writer, typename Parent>
+        requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
     friend class AsyncWriterSink;
     // _operator_id : the current Operator's ID, which is not visible to the user.
     // _node_id : the plan node ID corresponding to the Operator, which is visible on the profile.
@@ -680,6 +684,7 @@ public:
 };
 
 template <typename Writer, typename Parent>
+    requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 class AsyncWriterSink : public PipelineXSinkLocalState<FakeDependency> {
 public:
     using Base = PipelineXSinkLocalState<FakeDependency>;
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index f65f06399b8..5c49954ef2f 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -38,6 +38,8 @@ public:
 
     virtual Status init(RuntimeState* state) = 0;
 
+    virtual Status finish(RuntimeState* state) { return Status::OK(); }
+
     virtual Status close(Status s = Status::OK()) = 0;
 
     [[nodiscard]] virtual int64_t get_written_rows() const { return _written_rows; }
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 1a6f8a2661f..d84b86f2290 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -365,24 +365,6 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
     return _process_status;
 }
 
-Status RuntimeState::check_query_state(const std::string& msg) {
-    // TODO: it would be nice if this also checked for cancellation, but doing so breaks
-    // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached.
-    //
-    // If the thread MemTrackerLimiter exceeds the limit, an error status is returned.
-    // Usually used after SCOPED_ATTACH_TASK, during query execution.
-    if (is_thread_context_init() && thread_context()->thread_mem_tracker()->limit_exceeded() &&
-        !config::enable_query_memory_overcommit) {
-        auto failed_msg =
-                fmt::format("{}, {}", msg,
-                            thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str());
-        thread_context()->thread_mem_tracker()->print_log_usage(failed_msg);
-        log_error(failed_msg);
-        return Status::MemoryLimitExceeded(failed_msg);
-    }
-    return query_status();
-}
-
 const int64_t MAX_ERROR_NUM = 50;
 
 Status RuntimeState::create_error_log_file() {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ede451cd92a..91443ef9492 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -236,11 +236,6 @@ public:
     // generic "Memory limit exceeded" error.
     Status set_mem_limit_exceeded(const std::string& msg = "Memory limit exceeded");
 
-    // Returns a non-OK status if query execution should stop (e.g., the query was cancelled
-    // or a mem limit was exceeded). Exec nodes should check this periodically so execution
-    // doesn't continue if the query terminates abnormally.
-    Status check_query_state(const std::string& msg);
-
     std::vector<std::string>& output_files() { return _output_files; }
 
     void set_import_label(const std::string& import_label) { _import_label = import_label; }
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp
index 6a7ebcffd1e..95c0abd72a8 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -168,8 +168,6 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl
             } else {
                 RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size()));
                 RETURN_IF_CANCELLED(state);
-                RETURN_IF_ERROR(
-                        state->check_query_state("VPartitionSortNode, while split input block."));
                 input_block->clear_column_data();
             }
         }
@@ -237,7 +235,6 @@ Status VPartitionSortNode::alloc_resource(RuntimeState* state) {
     RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
     RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(state->check_query_state("VPartitionSortNode, while open."));
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 189c628ef0b..e313e3f74ac 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -130,7 +130,6 @@ Status VSortNode::alloc_resource(doris::RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::alloc_resource(state));
     RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(state->check_query_state("vsort, while open."));
 
     return Status::OK();
 }
@@ -140,7 +139,6 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool
     if (input_block->rows() > 0) {
         RETURN_IF_ERROR(_sorter->append_block(input_block));
         RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input."));
 
         // update runtime predicate
         if (_use_topn_opt) {
diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp
index 8affd4cbe7e..0c35fae806e 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -170,7 +170,6 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
 
     while (columns[_child_slots.size()]->size() < state->batch_size()) {
         RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch."));
 
         if (_child_block->rows() == 0) {
             break;
diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h
index 600eb609281..1260382d6fa 100644
--- a/be/src/vec/sink/async_writer_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -42,6 +42,7 @@ namespace vectorized {
 class Block;
 
 template <typename Writer, const char* Name>
+    requires(std::is_base_of_v<AsyncResultWriter, Writer>)
 class AsyncWriterSink : public DataSink {
 public:
     AsyncWriterSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_exprs)
@@ -78,13 +79,21 @@ public:
         return Status::OK();
     }
 
+    // Non-pipeline engine will call this api to send data to sink destination
     Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override {
         SCOPED_TIMER(_exec_timer);
         COUNTER_UPDATE(_blocks_sent_counter, 1);
         COUNTER_UPDATE(_output_rows_counter, block->rows());
-        return _writer->write(*block);
+        Status st = _writer->write(*block);
+        // Should also check !state->is_cancelled()???, do not know which scenario?
+        if (st.ok() && eos) {
+            // If this is the last block, then call finish to flush the buffer or commit transctions.
+            st = _writer->finish(state);
+        }
+        return st;
     }
 
+    // Pipeline engine will call this api to send data to destination. This is an async API.
     Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) override {
         return _writer->sink(block, eos);
     }
@@ -94,13 +103,11 @@ public:
     Status close(RuntimeState* state, Status exec_status) override {
         // if the init failed, the _writer may be nullptr. so here need check
         if (_writer) {
-            if (_writer->need_normal_close()) {
-                if (exec_status.ok() && !state->is_cancelled()) {
-                    RETURN_IF_ERROR(_writer->commit_trans());
-                }
-                RETURN_IF_ERROR(_writer->close(exec_status));
-            } else {
+            // For pipeline engine, the writer is always closed in async thread process_block
+            if (state->enable_pipeline_exec()) {
                 RETURN_IF_ERROR(_writer->get_writer_status());
+            } else {
+                RETURN_IF_ERROR(_writer->close(exec_status));
             }
         }
         return DataSink::close(state, exec_status);
@@ -113,7 +120,7 @@ public:
         return Status::OK();
     }
 
-    bool is_close_done() override { return !_writer->is_pending_finish(); }
+    [[nodiscard]] bool is_pending_finish() const override { return _writer->is_pending_finish(); }
 
 protected:
     const std::vector<TExpr>& _t_output_expr;
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
index 02d77fa6d42..08dd881bf4b 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -113,14 +113,21 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
     }
 
     Status final_status = exec_status;
-    // close the writer
-    if (_writer && _writer->need_normal_close()) {
-        Status st = _writer->close();
-        if (!st.ok() && exec_status.ok()) {
-            // close file writer failed, should return this error to client
-            final_status = st;
+    Status writer_st = Status::OK();
+    if (_writer) {
+        // For pipeline engine, the writer is always closed in async thread process_block
+        if (state->enable_pipeline_exec()) {
+            writer_st = _writer->get_writer_status();
+        } else {
+            writer_st = _writer->close(exec_status);
         }
     }
+
+    if (!writer_st.ok() && exec_status.ok()) {
+        // close file writer failed, should return this error to client
+        final_status = writer_st;
+    }
+
     if (_is_top_sink) {
         // close sender, this is normal path end
         if (_sender) {
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp
index 0ce3196e3a3..35d94fe3c47 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -125,15 +125,22 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
         }
     }
 
-    // if not in transaction or status is in error or force close we can do close in
-    // async IO thread
-    if (!_writer_status.ok() || !in_transaction()) {
-        std::lock_guard l(_m);
-        // Using lock to make sure the writer status is not modified
-        // There is a unique ptr err_msg in Status, if it is modified, the unique ptr
-        // maybe released. And it will core because use after free.
+    // If the last block is sent successfuly, then call finish to clear the buffer or commit
+    // transactions.
+    // Using lock to make sure the writer status is not modified
+    // There is a unique ptr err_msg in Status, if it is modified, the unique ptr
+    // maybe released. And it will core because use after free.
+    std::lock_guard l(_m);
+    if (_writer_status.ok() && _eos) {
+        _writer_status = finish(state);
+    }
+
+    if (_writer_status.ok()) {
         _writer_status = close(_writer_status);
-        _need_normal_close = false;
+    } else {
+        // If it is already failed before, then not update the write status so that we could get
+        // the real reason.
+        static_cast<void>(close(_writer_status));
     }
     _writer_thread_closed = true;
     if (_finish_dependency) {
diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h
index ced123267c5..5fbcca98af3 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -61,12 +61,6 @@ public:
 
     void force_close(Status s);
 
-    virtual bool in_transaction() { return false; }
-
-    virtual Status commit_trans() { return Status::OK(); }
-
-    bool need_normal_close() const { return _need_normal_close; }
-
     Status init(RuntimeState* state) override { return Status::OK(); }
 
     virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
@@ -110,7 +104,6 @@ private:
     std::deque<std::unique_ptr<Block>> _data_queue;
     Status _writer_status = Status::OK();
     bool _eos = false;
-    bool _need_normal_close = true;
     bool _writer_thread_closed = false;
 
     // Used by pipelineX
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h b/be/src/vec/sink/writer/vjdbc_table_writer.h
index 735c023fce5..a683259c992 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -46,11 +46,9 @@ public:
 
     Status write(vectorized::Block& block) override;
 
-    Status close(Status s) override { return JdbcConnector::close(s); }
-
-    bool in_transaction() override { return TableConnector::_is_in_transaction; }
+    Status finish(RuntimeState* state) override { return JdbcConnector::finish_trans(); }
 
-    Status commit_trans() override { return JdbcConnector::finish_trans(); }
+    Status close(Status s) override { return JdbcConnector::close(s); }
 
 private:
     JdbcConnectorParam _param;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h b/be/src/vec/sink/writer/vodbc_table_writer.h
index a28947355e7..687b5106a8b 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -46,11 +46,9 @@ public:
 
     Status write(vectorized::Block& block) override;
 
-    Status close(Status s) override { return ODBCConnector::close(s); }
-
-    bool in_transaction() override { return TableConnector::_is_in_transaction; }
+    Status finish(RuntimeState* state) override { return ODBCConnector::finish_trans(); }
 
-    Status commit_trans() override { return ODBCConnector::finish_trans(); }
+    Status close(Status s) override { return ODBCConnector::close(s); }
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp
index 7c51b80a015..7af7b115c43 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -865,10 +865,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
     static_cast<void>(request->release_id());
 }
 
-bool VNodeChannel::is_send_data_rpc_done() const {
-    return _add_batches_finished || _cancelled;
-}
-
 Status VNodeChannel::close_wait(RuntimeState* state) {
     DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemInfo::process_full_gc(); });
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
@@ -891,7 +887,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
     }
 
     // waiting for finished, it may take a long time, so we couldn't set a timeout
-    // In pipeline, is_close_done() is false at this time, will not block.
+    // For pipeline engine, the close is called in async writer's process block method,
+    // so that it will not block pipeline thread.
     while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
         bthread_usleep(1000);
     }
@@ -1358,7 +1355,7 @@ Status VTabletWriter::_send_new_partition_batch() {
     return Status::OK();
 }
 
-Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) {
+void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status) {
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
 
@@ -1400,23 +1397,6 @@ Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) {
         _close_status = status;
         _close_wait = true;
     }
-
-    return Status::OK();
-}
-
-bool VTabletWriter::is_close_done() {
-    // Only after try_close, need to wait rpc end.
-    if (!_close_wait) {
-        return true;
-    }
-    bool close_done = true;
-    for (const auto& index_channel : _channels) {
-        index_channel->for_each_node_channel(
-                [&close_done](const std::shared_ptr<VNodeChannel>& ch) {
-                    close_done &= ch->is_send_data_rpc_done();
-                });
-    }
-    return close_done;
 }
 
 Status VTabletWriter::close(Status exec_status) {
@@ -1431,7 +1411,7 @@ Status VTabletWriter::close(Status exec_status) {
     SCOPED_TIMER(_profile->total_time_counter());
 
     // will make the last batch of request-> close_wait will wait this finished.
-    static_cast<void>(try_close(_state, exec_status));
+    _do_try_close(_state, exec_status);
 
     // If _close_status is not ok, all nodes have been canceled in try_close.
     if (_close_status.ok()) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h
index 58ddbfd8869..cd2eafb1f24 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -259,8 +259,6 @@ public:
     // 2. just cancel()
     void mark_close();
 
-    bool is_send_data_rpc_done() const;
-
     bool is_closed() const { return _is_closed; }
     bool is_cancelled() const { return _cancelled; }
     std::string get_cancel_msg() {
@@ -527,15 +525,11 @@ public:
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status try_close(RuntimeState* state, Status exec_status);
-
     // the consumer func of sending pending batches in every NodeChannel.
     // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending.
     // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the producer
     void _send_batch_process();
 
-    bool is_close_done();
-
     Status on_partitions_created(TCreatePartitionResult* result);
 
     Status _send_new_partition_batch();
@@ -562,6 +556,8 @@ private:
 
     Status _incremental_open_node_channel(const std::vector<TOlapTablePartition>& partitions);
 
+    void _do_try_close(RuntimeState* state, const Status& exec_status);
+
     TDataSink _t_sink;
 
     std::shared_ptr<MemTracker> _mem_tracker;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org