You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/12/23 14:17:58 UTC
[doris] branch master updated: [pipeline](fix) Fix bugs to pass all regression cases (#15306)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 06f71f2bca [pipeline](fix) Fix bugs to pass all regression cases (#15306)
06f71f2bca is described below
commit 06f71f2bca7f8f38e0eb58a43e661c8374e1de9f
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Dec 23 22:17:50 2022 +0800
[pipeline](fix) Fix bugs to pass all regression cases (#15306)
* [pipeline](fix) Fix bugs to pass all regression cases
* update
* update
---
be/src/pipeline/CMakeLists.txt | 1 +
be/src/pipeline/exec/operator.h | 2 +
.../pipeline/exec/result_file_sink_operator.cpp | 30 +++++-----
.../src/pipeline/exec/result_file_sink_operator.h | 41 ++++++++-----
be/src/pipeline/pipeline_fragment_context.cpp | 67 ++++++++++++++--------
be/src/pipeline/pipeline_fragment_context.h | 4 +-
be/src/pipeline/pipeline_task.cpp | 2 +
be/src/pipeline/task_scheduler.cpp | 1 +
be/src/runtime/fragment_mgr.cpp | 6 +-
be/src/vec/exec/join/vhash_join_node.cpp | 19 +++++-
be/src/vec/exec/join/vhash_join_node.h | 4 +-
be/src/vec/exec/join/vnested_loop_join_node.cpp | 3 +-
be/src/vec/exec/vrepeat_node.cpp | 2 +-
be/src/vec/exec/vrepeat_node.h | 5 +-
be/src/vec/exec/vtable_function_node.h | 3 +-
.../string_functions/test_split_part.groovy | 2 +-
16 files changed, 128 insertions(+), 64 deletions(-)
diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt
index ee49218de3..c7c5bf87af 100644
--- a/be/src/pipeline/CMakeLists.txt
+++ b/be/src/pipeline/CMakeLists.txt
@@ -37,6 +37,7 @@ set(PIPELINE_FILES
exec/exchange_sink_operator.cpp
exec/exchange_sink_buffer.cpp
exec/result_sink_operator.cpp
+ exec/result_file_sink_operator.cpp
exec/aggregation_sink_operator.cpp
exec/aggregation_source_operator.cpp
exec/hashjoin_build_sink.cpp
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 58c36b3a51..a5df8edd79 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -449,6 +449,8 @@ public:
source_state = SourceState::FINISHED;
} else if (!node->need_more_input_data()) {
source_state = SourceState::MORE_DATA;
+ } else if (source_state == SourceState::MORE_DATA) {
+ source_state = _child_source_state;
}
}
return Status::OK();
diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy b/be/src/pipeline/exec/result_file_sink_operator.cpp
similarity index 55%
copy from regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
copy to be/src/pipeline/exec/result_file_sink_operator.cpp
index 4369a1da1d..72f621ec8b 100644
--- a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -15,16 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_split_part") {
- test {
- sql """
- select
- name
- from
- tpch_tiny_nation
- where
- split_part("bCKHDX07at", "5.7.37", cast(name as int)) is not null;
- """
- exception "errCode = 2, detailMessage = [RUNTIME_ERROR]Argument at index 3 for function split_part must be constant"
- }
-}
\ No newline at end of file
+#include "result_file_sink_operator.h"
+
+#include "vec/sink/vresult_file_sink.h"
+
+namespace doris::pipeline {
+
+ResultFileSinkOperatorBuilder::ResultFileSinkOperatorBuilder(int32_t id, DataSink* sink)
+ : DataSinkOperatorBuilder(id, "ResultSinkOperator", sink) {};
+
+OperatorPtr ResultFileSinkOperatorBuilder::build_operator() {
+ return std::make_shared<ResultFileSinkOperator>(this, _sink);
+}
+
+ResultFileSinkOperator::ResultFileSinkOperator(OperatorBuilderBase* operator_builder,
+ DataSink* sink)
+ : DataSinkOperator(operator_builder, sink) {};
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy b/be/src/pipeline/exec/result_file_sink_operator.h
similarity index 55%
copy from regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
copy to be/src/pipeline/exec/result_file_sink_operator.h
index 4369a1da1d..5f21b108a7 100644
--- a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -15,16 +15,31 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_split_part") {
- test {
- sql """
- select
- name
- from
- tpch_tiny_nation
- where
- split_part("bCKHDX07at", "5.7.37", cast(name as int)) is not null;
- """
- exception "errCode = 2, detailMessage = [RUNTIME_ERROR]Argument at index 3 for function split_part must be constant"
- }
-}
\ No newline at end of file
+#pragma once
+
+#include "operator.h"
+
+namespace doris {
+namespace vectorized {
+class VResultFileSink;
+}
+
+namespace pipeline {
+
+class ResultFileSinkOperatorBuilder final
+ : public DataSinkOperatorBuilder<vectorized::VResultFileSink> {
+public:
+ ResultFileSinkOperatorBuilder(int32_t id, DataSink* sink);
+
+ OperatorPtr build_operator() override;
+};
+
+class ResultFileSinkOperator final : public DataSinkOperator<ResultFileSinkOperatorBuilder> {
+public:
+ ResultFileSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink);
+
+ bool can_write() override { return true; }
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 5b9918885a..4cfeb036ae 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -20,43 +20,44 @@
#include <gen_cpp/DataSinks_types.h>
#include <thrift/protocol/TDebugProtocol.h>
-#include "exec/aggregation_sink_operator.h"
-#include "exec/aggregation_source_operator.h"
-#include "exec/analytic_sink_operator.h"
-#include "exec/analytic_source_operator.h"
#include "exec/data_sink.h"
-#include "exec/datagen_operator.h"
-#include "exec/empty_set_operator.h"
-#include "exec/exchange_sink_operator.h"
-#include "exec/exchange_source_operator.h"
-#include "exec/hashjoin_build_sink.h"
-#include "exec/hashjoin_probe_operator.h"
-#include "exec/mysql_scan_operator.h"
-#include "exec/repeat_operator.h"
-#include "exec/result_sink_operator.h"
#include "exec/scan_node.h"
-#include "exec/scan_operator.h"
-#include "exec/schema_scan_operator.h"
-#include "exec/select_operator.h"
-#include "exec/set_probe_sink_operator.h"
-#include "exec/set_sink_operator.h"
-#include "exec/set_source_operator.h"
-#include "exec/sort_sink_operator.h"
-#include "exec/sort_source_operator.h"
-#include "exec/streaming_aggregation_sink_operator.h"
-#include "exec/streaming_aggregation_source_operator.h"
-#include "exec/table_sink_operator.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
+#include "pipeline/exec/aggregation_sink_operator.h"
+#include "pipeline/exec/aggregation_source_operator.h"
+#include "pipeline/exec/analytic_sink_operator.h"
+#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
#include "pipeline/exec/broker_scan_operator.h"
#include "pipeline/exec/const_value_operator.h"
#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/datagen_operator.h"
+#include "pipeline/exec/empty_set_operator.h"
+#include "pipeline/exec/exchange_sink_operator.h"
+#include "pipeline/exec/exchange_source_operator.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/mysql_scan_operator.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/operator.h"
+#include "pipeline/exec/repeat_operator.h"
+#include "pipeline/exec/result_file_sink_operator.h"
+#include "pipeline/exec/result_sink_operator.h"
+#include "pipeline/exec/scan_operator.h"
+#include "pipeline/exec/schema_scan_operator.h"
+#include "pipeline/exec/select_operator.h"
+#include "pipeline/exec/set_probe_sink_operator.h"
+#include "pipeline/exec/set_sink_operator.h"
+#include "pipeline/exec/set_source_operator.h"
+#include "pipeline/exec/sort_sink_operator.h"
+#include "pipeline/exec/sort_source_operator.h"
+#include "pipeline/exec/streaming_aggregation_sink_operator.h"
+#include "pipeline/exec/streaming_aggregation_source_operator.h"
#include "pipeline/exec/table_function_operator.h"
+#include "pipeline/exec/table_sink_operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/exec/union_source_operator.h"
#include "pipeline_task.h"
@@ -78,6 +79,7 @@
#include "vec/exec/vsort_node.h"
#include "vec/exec/vunion_node.h"
#include "vec/runtime/vdata_stream_mgr.h"
+#include "vec/sink/vresult_file_sink.h"
#include "vec/sink/vresult_sink.h"
using apache::thrift::transport::TTransportException;
@@ -99,6 +101,10 @@ PipelineFragmentContext::PipelineFragmentContext(
_fragment_watcher.start();
}
+PipelineFragmentContext::~PipelineFragmentContext() {
+ _call_back(_runtime_state.get(), &_exec_status);
+}
+
void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
if (!_runtime_state->is_cancelled()) {
@@ -547,6 +553,14 @@ Status PipelineFragmentContext::submit() {
}
}
+void PipelineFragmentContext::close_if_prepare_failed() {
+ for (auto& task : _tasks) {
+ DCHECK(!task->is_pending_finish());
+ WARN_IF_ERROR(task->close(), "close_if_prepare_failed failed: ");
+ close_a_pipeline();
+ }
+}
+
// construct sink operator
Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
OperatorBuilderPtr sink_;
@@ -572,6 +586,11 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
sink_ = std::make_shared<TableSinkOperatorBuilder>(next_operator_builder_id(), _sink.get());
break;
}
+ case TDataSinkType::RESULT_FILE_SINK: {
+ sink_ = std::make_shared<ResultFileSinkOperatorBuilder>(next_operator_builder_id(),
+ _sink.get());
+ break;
+ }
default:
return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
}
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index 6f4f30ac91..0b36a3a421 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -41,7 +41,7 @@ public:
ExecEnv* exec_env,
std::function<void(RuntimeState*, Status*)> call_back);
- ~PipelineFragmentContext() { _call_back(_runtime_state.get(), &_exec_status); }
+ ~PipelineFragmentContext();
PipelinePtr add_pipeline();
@@ -58,6 +58,8 @@ public:
Status submit();
+ void close_if_prepare_failed();
+
void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; }
ExecNode*& plan() { return _root_plan; }
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index c55ef9e305..f6c2fc4f93 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -226,6 +226,8 @@ std::string PipelineTask::debug_string() const {
fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
_operators[i]->debug_string());
}
+ fmt::format_to(debug_string_buffer, "\n{}{}", std::string(_operators.size() * 2, ' '),
+ _sink->debug_string());
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index c5e5ee57c1..3ee4cb82d1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -249,6 +249,7 @@ void TaskScheduler::_do_work(size_t index) {
status.to_string());
// exec failed,cancel all fragment instance
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string());
+ fragment_ctx->send_report(true);
_try_close_task(task, CANCELED);
continue;
}
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8801e733ee..0beccbdf9c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -747,8 +747,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
fragments_ctx, _exec_env, cb);
{
SCOPED_RAW_TIMER(&duration_ns);
- RETURN_IF_ERROR(context->prepare(params));
+ auto prepare_st = context->prepare(params);
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
+ if (!prepare_st.ok()) {
+ context->close_if_prepare_failed();
+ return prepare_st;
+ }
}
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 497f60b828..70bf40cce0 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -448,7 +448,7 @@ Status HashJoinNode::close(RuntimeState* state) {
return VJoinNodeBase::close(state);
}
-bool HashJoinNode::need_more_input_data() {
+bool HashJoinNode::need_more_input_data() const {
return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos &&
!_short_circuit_for_null_in_probe_side;
}
@@ -812,7 +812,9 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
}
_shared_hashtable_controller->signal(id());
}
- } else if (!_should_build_hash_table) {
+ } else if (!_should_build_hash_table &&
+ ((state->enable_pipeline_exec() && eos) || !state->enable_pipeline_exec())) {
+ // TODO: For pipeline engine, we should finish this pipeline task if _should_build_hash_table is false
if (!state->enable_pipeline_exec()) {
child(1)->close(state);
}
@@ -858,12 +860,23 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
}
}
- if (eos || !_should_build_hash_table) {
+ if (eos || (!_should_build_hash_table && !state->enable_pipeline_exec())) {
_process_hashtable_ctx_variants_init(state);
}
return Status::OK();
}
+void HashJoinNode::debug_string(int indentation_level, std::stringstream* out) const {
+ *out << string(indentation_level * 2, ' ');
+ *out << "HashJoin(need_more_input_data=" << (need_more_input_data() ? "true" : "false")
+ << " _probe_block.rows()=" << _probe_block.rows() << " _probe_index=" << _probe_index
+ << " _probe_eos=" << _probe_eos
+ << " _short_circuit_for_null_in_probe_side=" << _short_circuit_for_null_in_probe_side;
+ *out << ")\n children=(";
+ ExecNode::debug_string(indentation_level, out);
+ *out << ")";
+}
+
template <bool BuildSide>
Status HashJoinNode::_extract_join_column(Block& block, ColumnUInt8::MutablePtr& null_map,
ColumnRawPtrs& raw_ptrs,
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 76fa064903..9e1f61050a 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -207,11 +207,13 @@ public:
Status alloc_resource(RuntimeState* state) override;
void release_resource(RuntimeState* state) override;
Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override;
- bool need_more_input_data();
+ bool need_more_input_data() const;
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override;
void prepare_for_next() override;
+ void debug_string(int indentation_level, std::stringstream* out) const override;
+
private:
using VExprContexts = std::vector<VExprContext*>;
// probe expr
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 7fc43fcf19..5bebcd0d10 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -611,8 +611,9 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
void VNestedLoopJoinNode::debug_string(int indentation_level, std::stringstream* out) const {
*out << std::string(indentation_level * 2, ' ');
*out << "VNestedLoopJoinNode";
- *out << "(eos=" << (_matched_rows_done ? "true" : "false")
+ *out << "(need_more_input_data=" << (need_more_input_data() ? "true" : "false")
<< " left_block_pos=" << _left_block_pos;
+ *out << ")\n children=";
VJoinNodeBase::debug_string(indentation_level, out);
*out << ")";
}
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 01db7d29ef..882619930e 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -223,7 +223,7 @@ Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bo
return Status::OK();
}
-bool VRepeatNode::need_more_input_data() {
+bool VRepeatNode::need_more_input_data() const {
return !_child_block.rows() && !_child_eos;
}
diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h
index 394690d729..5f038c606e 100644
--- a/be/src/vec/exec/vrepeat_node.h
+++ b/be/src/vec/exec/vrepeat_node.h
@@ -45,11 +45,10 @@ public:
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override;
- bool need_more_input_data();
+ bool need_more_input_data() const;
Block* get_child_block() { return &_child_block; }
-protected:
- virtual void debug_string(int indentation_level, std::stringstream* out) const override;
+ void debug_string(int indentation_level, std::stringstream* out) const override;
private:
Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block);
diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h
index 85c0fca5be..85eccc047a 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -37,8 +37,7 @@ public:
return _children[0]->open(state);
}
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
-
- bool need_more_input_data() { return !_child_block.rows() && !_child_eos; }
+ bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; }
void release_resource(doris::RuntimeState* state) override {
Expr::close(_fn_ctxs, state);
diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy b/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
index 4369a1da1d..9b15ffb686 100644
--- a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
+++ b/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy
@@ -25,6 +25,6 @@ suite("test_split_part") {
where
split_part("bCKHDX07at", "5.7.37", cast(name as int)) is not null;
"""
- exception "errCode = 2, detailMessage = [RUNTIME_ERROR]Argument at index 3 for function split_part must be constant"
+ exception "[RUNTIME_ERROR]Argument at index 3 for function split_part must be constant"
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org