You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/12/23 14:17:58 UTC

[doris] branch master updated: [pipeline](fix) Fix bugs to pass all regression cases (#15306)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 06f71f2bca [pipeline](fix) Fix bugs to pass all regression cases (#15306)
06f71f2bca is described below

commit 06f71f2bca7f8f38e0eb58a43e661c8374e1de9f
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Dec 23 22:17:50 2022 +0800

    [pipeline](fix) Fix bugs to pass all regression cases (#15306)
    
    * [pipeline](fix) Fix bugs to pass all regression cases
    
    * update
    
    * update
---
 be/src/pipeline/CMakeLists.txt                     |  1 +
 be/src/pipeline/exec/operator.h                    |  2 +
 .../pipeline/exec/result_file_sink_operator.cpp    | 30 +++++-----
 .../src/pipeline/exec/result_file_sink_operator.h  | 41 ++++++++-----
 be/src/pipeline/pipeline_fragment_context.cpp      | 67 ++++++++++++++--------
 be/src/pipeline/pipeline_fragment_context.h        |  4 +-
 be/src/pipeline/pipeline_task.cpp                  |  2 +
 be/src/pipeline/task_scheduler.cpp                 |  1 +
 be/src/runtime/fragment_mgr.cpp                    |  6 +-
 be/src/vec/exec/join/vhash_join_node.cpp           | 19 +++++-
 be/src/vec/exec/join/vhash_join_node.h             |  4 +-
 be/src/vec/exec/join/vnested_loop_join_node.cpp    |  3 +-
 be/src/vec/exec/vrepeat_node.cpp                   |  2 +-
 be/src/vec/exec/vrepeat_node.h                     |  5 +-
 be/src/vec/exec/vtable_function_node.h             |  3 +-
 .../string_functions/test_split_part.groovy        |  2 +-
 16 files changed, 128 insertions(+), 64 deletions(-)

diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt
index ee49218de3..c7c5bf87af 100644
--- a/be/src/pipeline/CMakeLists.txt
+++ b/be/src/pipeline/CMakeLists.txt
@@ -37,6 +37,7 @@ set(PIPELINE_FILES
         exec/exchange_sink_operator.cpp
         exec/exchange_sink_buffer.cpp
         exec/result_sink_operator.cpp
+        exec/result_file_sink_operator.cpp
         exec/aggregation_sink_operator.cpp
         exec/aggregation_source_operator.cpp
         exec/hashjoin_build_sink.cpp
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 58c36b3a51..a5df8edd79 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -449,6 +449,8 @@ public:
                 source_state = SourceState::FINISHED;
             } else if (!node->need_more_input_data()) {
                 source_state = SourceState::MORE_DATA;
+            } else if (source_state == SourceState::MORE_DATA) {
+                source_state = _child_source_state;
             }
         }
         return Status::OK();
diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy b/be/src/pipeline/exec/result_file_sink_operator.cpp
similarity index 55%
copy from regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
copy to be/src/pipeline/exec/result_file_sink_operator.cpp
index 4369a1da1d..72f621ec8b 100644
--- a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -15,16 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_split_part") {
-  test {
-    sql """
-      select
-          name
-      from
-          tpch_tiny_nation
-      where
-          split_part("bCKHDX07at", "5.7.37", cast(name as int)) is not null;
-    """
-    exception "errCode = 2, detailMessage = [RUNTIME_ERROR]Argument at index 3 for function split_part must be constant"
-  }
-}
\ No newline at end of file
+#include "result_file_sink_operator.h"
+
+#include "vec/sink/vresult_file_sink.h"
+
+namespace doris::pipeline {
+
+ResultFileSinkOperatorBuilder::ResultFileSinkOperatorBuilder(int32_t id, DataSink* sink)
+        : DataSinkOperatorBuilder(id, "ResultSinkOperator", sink) {};
+
+OperatorPtr ResultFileSinkOperatorBuilder::build_operator() {
+    return std::make_shared<ResultFileSinkOperator>(this, _sink);
+}
+
+ResultFileSinkOperator::ResultFileSinkOperator(OperatorBuilderBase* operator_builder,
+                                               DataSink* sink)
+        : DataSinkOperator(operator_builder, sink) {};
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy b/be/src/pipeline/exec/result_file_sink_operator.h
similarity index 55%
copy from regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
copy to be/src/pipeline/exec/result_file_sink_operator.h
index 4369a1da1d..5f21b108a7 100644
--- a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -15,16 +15,31 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_split_part") {
-  test {
-    sql """
-      select
-          name
-      from
-          tpch_tiny_nation
-      where
-          split_part("bCKHDX07at", "5.7.37", cast(name as int)) is not null;
-    """
-    exception "errCode = 2, detailMessage = [RUNTIME_ERROR]Argument at index 3 for function split_part must be constant"
-  }
-}
\ No newline at end of file
+#pragma once
+
+#include "operator.h"
+
+namespace doris {
+namespace vectorized {
+class VResultFileSink;
+}
+
+namespace pipeline {
+
+class ResultFileSinkOperatorBuilder final
+        : public DataSinkOperatorBuilder<vectorized::VResultFileSink> {
+public:
+    ResultFileSinkOperatorBuilder(int32_t id, DataSink* sink);
+
+    OperatorPtr build_operator() override;
+};
+
+class ResultFileSinkOperator final : public DataSinkOperator<ResultFileSinkOperatorBuilder> {
+public:
+    ResultFileSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink);
+
+    bool can_write() override { return true; }
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 5b9918885a..4cfeb036ae 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -20,43 +20,44 @@
 #include <gen_cpp/DataSinks_types.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
-#include "exec/aggregation_sink_operator.h"
-#include "exec/aggregation_source_operator.h"
-#include "exec/analytic_sink_operator.h"
-#include "exec/analytic_source_operator.h"
 #include "exec/data_sink.h"
-#include "exec/datagen_operator.h"
-#include "exec/empty_set_operator.h"
-#include "exec/exchange_sink_operator.h"
-#include "exec/exchange_source_operator.h"
-#include "exec/hashjoin_build_sink.h"
-#include "exec/hashjoin_probe_operator.h"
-#include "exec/mysql_scan_operator.h"
-#include "exec/repeat_operator.h"
-#include "exec/result_sink_operator.h"
 #include "exec/scan_node.h"
-#include "exec/scan_operator.h"
-#include "exec/schema_scan_operator.h"
-#include "exec/select_operator.h"
-#include "exec/set_probe_sink_operator.h"
-#include "exec/set_sink_operator.h"
-#include "exec/set_source_operator.h"
-#include "exec/sort_sink_operator.h"
-#include "exec/sort_source_operator.h"
-#include "exec/streaming_aggregation_sink_operator.h"
-#include "exec/streaming_aggregation_source_operator.h"
-#include "exec/table_sink_operator.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/HeartbeatService_types.h"
+#include "pipeline/exec/aggregation_sink_operator.h"
+#include "pipeline/exec/aggregation_source_operator.h"
+#include "pipeline/exec/analytic_sink_operator.h"
+#include "pipeline/exec/analytic_source_operator.h"
 #include "pipeline/exec/assert_num_rows_operator.h"
 #include "pipeline/exec/broker_scan_operator.h"
 #include "pipeline/exec/const_value_operator.h"
 #include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/datagen_operator.h"
+#include "pipeline/exec/empty_set_operator.h"
+#include "pipeline/exec/exchange_sink_operator.h"
+#include "pipeline/exec/exchange_source_operator.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/mysql_scan_operator.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
 #include "pipeline/exec/olap_table_sink_operator.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/exec/repeat_operator.h"
+#include "pipeline/exec/result_file_sink_operator.h"
+#include "pipeline/exec/result_sink_operator.h"
+#include "pipeline/exec/scan_operator.h"
+#include "pipeline/exec/schema_scan_operator.h"
+#include "pipeline/exec/select_operator.h"
+#include "pipeline/exec/set_probe_sink_operator.h"
+#include "pipeline/exec/set_sink_operator.h"
+#include "pipeline/exec/set_source_operator.h"
+#include "pipeline/exec/sort_sink_operator.h"
+#include "pipeline/exec/sort_source_operator.h"
+#include "pipeline/exec/streaming_aggregation_sink_operator.h"
+#include "pipeline/exec/streaming_aggregation_source_operator.h"
 #include "pipeline/exec/table_function_operator.h"
+#include "pipeline/exec/table_sink_operator.h"
 #include "pipeline/exec/union_sink_operator.h"
 #include "pipeline/exec/union_source_operator.h"
 #include "pipeline_task.h"
@@ -78,6 +79,7 @@
 #include "vec/exec/vsort_node.h"
 #include "vec/exec/vunion_node.h"
 #include "vec/runtime/vdata_stream_mgr.h"
+#include "vec/sink/vresult_file_sink.h"
 #include "vec/sink/vresult_sink.h"
 
 using apache::thrift::transport::TTransportException;
@@ -99,6 +101,10 @@ PipelineFragmentContext::PipelineFragmentContext(
     _fragment_watcher.start();
 }
 
+PipelineFragmentContext::~PipelineFragmentContext() {
+    _call_back(_runtime_state.get(), &_exec_status);
+}
+
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                      const std::string& msg) {
     if (!_runtime_state->is_cancelled()) {
@@ -547,6 +553,14 @@ Status PipelineFragmentContext::submit() {
     }
 }
 
+void PipelineFragmentContext::close_if_prepare_failed() {
+    for (auto& task : _tasks) {
+        DCHECK(!task->is_pending_finish());
+        WARN_IF_ERROR(task->close(), "close_if_prepare_failed failed: ");
+        close_a_pipeline();
+    }
+}
+
 // construct sink operator
 Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
     OperatorBuilderPtr sink_;
@@ -572,6 +586,11 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
         sink_ = std::make_shared<TableSinkOperatorBuilder>(next_operator_builder_id(), _sink.get());
         break;
     }
+    case TDataSinkType::RESULT_FILE_SINK: {
+        sink_ = std::make_shared<ResultFileSinkOperatorBuilder>(next_operator_builder_id(),
+                                                                _sink.get());
+        break;
+    }
     default:
         return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
     }
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index 6f4f30ac91..0b36a3a421 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -41,7 +41,7 @@ public:
                             ExecEnv* exec_env,
                             std::function<void(RuntimeState*, Status*)> call_back);
 
-    ~PipelineFragmentContext() { _call_back(_runtime_state.get(), &_exec_status); }
+    ~PipelineFragmentContext();
 
     PipelinePtr add_pipeline();
 
@@ -58,6 +58,8 @@ public:
 
     Status submit();
 
+    void close_if_prepare_failed();
+
     void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; }
 
     ExecNode*& plan() { return _root_plan; }
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index c55ef9e305..f6c2fc4f93 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -226,6 +226,8 @@ std::string PipelineTask::debug_string() const {
         fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
                        _operators[i]->debug_string());
     }
+    fmt::format_to(debug_string_buffer, "\n{}{}", std::string(_operators.size() * 2, ' '),
+                   _sink->debug_string());
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index c5e5ee57c1..3ee4cb82d1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -249,6 +249,7 @@ void TaskScheduler::_do_work(size_t index) {
                                         status.to_string());
             // exec failed,cancel all fragment instance
             fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string());
+            fragment_ctx->send_report(true);
             _try_close_task(task, CANCELED);
             continue;
         }
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8801e733ee..0beccbdf9c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -747,8 +747,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
                         fragments_ctx, _exec_env, cb);
         {
             SCOPED_RAW_TIMER(&duration_ns);
-            RETURN_IF_ERROR(context->prepare(params));
+            auto prepare_st = context->prepare(params);
             g_fragmentmgr_prepare_latency << (duration_ns / 1000);
+            if (!prepare_st.ok()) {
+                context->close_if_prepare_failed();
+                return prepare_st;
+            }
         }
 
         std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 497f60b828..70bf40cce0 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -448,7 +448,7 @@ Status HashJoinNode::close(RuntimeState* state) {
     return VJoinNodeBase::close(state);
 }
 
-bool HashJoinNode::need_more_input_data() {
+bool HashJoinNode::need_more_input_data() const {
     return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos &&
            !_short_circuit_for_null_in_probe_side;
 }
@@ -812,7 +812,9 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
             }
             _shared_hashtable_controller->signal(id());
         }
-    } else if (!_should_build_hash_table) {
+    } else if (!_should_build_hash_table &&
+               ((state->enable_pipeline_exec() && eos) || !state->enable_pipeline_exec())) {
+        // TODO: For pipeline engine, we should finish this pipeline task if _should_build_hash_table is false
         if (!state->enable_pipeline_exec()) {
             child(1)->close(state);
         }
@@ -858,12 +860,23 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
         }
     }
 
-    if (eos || !_should_build_hash_table) {
+    if (eos || (!_should_build_hash_table && !state->enable_pipeline_exec())) {
         _process_hashtable_ctx_variants_init(state);
     }
     return Status::OK();
 }
 
+void HashJoinNode::debug_string(int indentation_level, std::stringstream* out) const {
+    *out << string(indentation_level * 2, ' ');
+    *out << "HashJoin(need_more_input_data=" << (need_more_input_data() ? "true" : "false")
+         << " _probe_block.rows()=" << _probe_block.rows() << " _probe_index=" << _probe_index
+         << " _probe_eos=" << _probe_eos
+         << " _short_circuit_for_null_in_probe_side=" << _short_circuit_for_null_in_probe_side;
+    *out << ")\n children=(";
+    ExecNode::debug_string(indentation_level, out);
+    *out << ")";
+}
+
 template <bool BuildSide>
 Status HashJoinNode::_extract_join_column(Block& block, ColumnUInt8::MutablePtr& null_map,
                                           ColumnRawPtrs& raw_ptrs,
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 76fa064903..9e1f61050a 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -207,11 +207,13 @@ public:
     Status alloc_resource(RuntimeState* state) override;
     void release_resource(RuntimeState* state) override;
     Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override;
-    bool need_more_input_data();
+    bool need_more_input_data() const;
     Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override;
     void prepare_for_next() override;
 
+    void debug_string(int indentation_level, std::stringstream* out) const override;
+
 private:
     using VExprContexts = std::vector<VExprContext*>;
     // probe expr
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 7fc43fcf19..5bebcd0d10 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -611,8 +611,9 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
 void VNestedLoopJoinNode::debug_string(int indentation_level, std::stringstream* out) const {
     *out << std::string(indentation_level * 2, ' ');
     *out << "VNestedLoopJoinNode";
-    *out << "(eos=" << (_matched_rows_done ? "true" : "false")
+    *out << "(need_more_input_data=" << (need_more_input_data() ? "true" : "false")
          << " left_block_pos=" << _left_block_pos;
+    *out << ")\n children=";
     VJoinNodeBase::debug_string(indentation_level, out);
     *out << ")";
 }
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 01db7d29ef..882619930e 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -223,7 +223,7 @@ Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bo
     return Status::OK();
 }
 
-bool VRepeatNode::need_more_input_data() {
+bool VRepeatNode::need_more_input_data() const {
     return !_child_block.rows() && !_child_eos;
 }
 
diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h
index 394690d729..5f038c606e 100644
--- a/be/src/vec/exec/vrepeat_node.h
+++ b/be/src/vec/exec/vrepeat_node.h
@@ -45,11 +45,10 @@ public:
 
     Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override;
-    bool need_more_input_data();
+    bool need_more_input_data() const;
     Block* get_child_block() { return &_child_block; }
 
-protected:
-    virtual void debug_string(int indentation_level, std::stringstream* out) const override;
+    void debug_string(int indentation_level, std::stringstream* out) const override;
 
 private:
     Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block);
diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h
index 85c0fca5be..85eccc047a 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -37,8 +37,7 @@ public:
         return _children[0]->open(state);
     }
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
-
-    bool need_more_input_data() { return !_child_block.rows() && !_child_eos; }
+    bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; }
 
     void release_resource(doris::RuntimeState* state) override {
         Expr::close(_fn_ctxs, state);
diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy b/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
index 4369a1da1d..9b15ffb686 100644
--- a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
+++ b/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
@@ -25,6 +25,6 @@ suite("test_split_part") {
       where
           split_part("bCKHDX07at", "5.7.37", cast(name as int)) is not null;
     """
-    exception "errCode = 2, detailMessage = [RUNTIME_ERROR]Argument at index 3 for function split_part must be constant"
+    exception "[RUNTIME_ERROR]Argument at index 3 for function split_part must be constant"
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org