You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/09 01:39:37 UTC
[doris] branch master updated: [vectorized](pipeline) support assert num rows operator (#14923)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 20f2abb3d4 [vectorized](pipeline) support assert num rows operator (#14923)
20f2abb3d4 is described below
commit 20f2abb3d4ce5cd1a013eef3c8dec6835a757a4b
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Fri Dec 9 09:39:29 2022 +0800
[vectorized](pipeline) support assert num rows operator (#14923)
---
.../exec/assert_num_rows_operator.h} | 40 ++++++++++++----------
be/src/pipeline/pipeline_fragment_context.cpp | 9 +++++
be/src/vec/exec/vassert_num_rows_node.cpp | 19 +++++-----
be/src/vec/exec/vassert_num_rows_node.h | 6 ++--
4 files changed, 46 insertions(+), 28 deletions(-)
diff --git a/be/src/vec/exec/vassert_num_rows_node.h b/be/src/pipeline/exec/assert_num_rows_operator.h
similarity index 50%
copy from be/src/vec/exec/vassert_num_rows_node.h
copy to be/src/pipeline/exec/assert_num_rows_operator.h
index cce963f25f..60bbc7c49c 100644
--- a/be/src/vec/exec/vassert_num_rows_node.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -15,28 +15,32 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/exec_node.h"
-#include "gen_cpp/PlanNodes_types.h"
+#pragma once
-namespace doris::vectorized {
-class Block;
+#include "operator.h"
+#include "vec/exec/vassert_num_rows_node.h"
-// Node for assert row count
-class VAssertNumRowsNode : public ExecNode {
-public:
- VAssertNumRowsNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+namespace doris {
+
+namespace pipeline {
- Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
- return Status::NotSupported("Not Implemented VAnalyticEvalNode::get_next.");
- }
+class AssertNumRowsOperatorBuilder final : public OperatorBuilder<vectorized::VAssertNumRowsNode> {
+public:
+ AssertNumRowsOperatorBuilder(int32_t id, ExecNode* node)
+ : OperatorBuilder(id, "AssertNumRowsOperatorBuilder", node) {};
- virtual Status open(RuntimeState* state) override;
- virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+ OperatorPtr build_operator() override;
+};
-private:
- int64_t _desired_num_rows;
- const std::string _subquery_string;
- TAssertion::type _assertion;
+class AssertNumRowsOperator final : public Operator<AssertNumRowsOperatorBuilder> {
+public:
+ AssertNumRowsOperator(OperatorBuilderBase* operator_builder, ExecNode* node)
+ : Operator(operator_builder, node) {};
};
-} // namespace doris::vectorized
+OperatorPtr AssertNumRowsOperatorBuilder::build_operator() {
+ return std::make_shared<AssertNumRowsOperator>(this, _node);
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 68ca3a8d7c..2f497dde41 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -39,7 +39,9 @@
#include "exec/streaming_aggregation_source_operator.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
+#include "pipeline/exec/assert_num_rows_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
+#include "pipeline/exec/operator.h"
#include "pipeline/exec/table_function_operator.h"
#include "pipeline_task.h"
#include "runtime/client_cache.h"
@@ -351,6 +353,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
+ case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
+ RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
+ OperatorBuilderPtr builder =
+ std::make_shared<AssertNumRowsOperatorBuilder>(next_operator_builder_id(), node);
+ RETURN_IF_ERROR(cur_pipe->add_operator(builder));
+ break;
+ }
case TPlanNodeType::TABLE_FUNCTION_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder =
diff --git a/be/src/vec/exec/vassert_num_rows_node.cpp b/be/src/vec/exec/vassert_num_rows_node.cpp
index b9239a3c53..0285acf56a 100644
--- a/be/src/vec/exec/vassert_num_rows_node.cpp
+++ b/be/src/vec/exec/vassert_num_rows_node.cpp
@@ -18,8 +18,6 @@
#include "vec/exec/vassert_num_rows_node.h"
#include "gen_cpp/PlanNodes_types.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
@@ -47,12 +45,7 @@ Status VAssertNumRowsNode::open(RuntimeState* state) {
return Status::OK();
}
-Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) {
- INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
- "VAssertNumRowsNode::get_next");
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos),
- child(0)->get_next_span(), *eos);
+Status VAssertNumRowsNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) {
_num_rows_returned += block->rows();
bool assert_res = false;
switch (_assertion) {
@@ -98,4 +91,14 @@ Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos
return Status::OK();
}
+Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+ INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
+ "VAssertNumRowsNode::get_next");
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos),
+ child(0)->get_next_span(), *eos);
+
+ return pull(state, block, eos);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vassert_num_rows_node.h b/be/src/vec/exec/vassert_num_rows_node.h
index cce963f25f..0f6ffcb9de 100644
--- a/be/src/vec/exec/vassert_num_rows_node.h
+++ b/be/src/vec/exec/vassert_num_rows_node.h
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include "common/status.h"
#include "exec/exec_node.h"
#include "gen_cpp/PlanNodes_types.h"
@@ -30,8 +31,9 @@ public:
return Status::NotSupported("Not Implemented VAnalyticEvalNode::get_next.");
}
- virtual Status open(RuntimeState* state) override;
- virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+ Status open(RuntimeState* state) override;
+ Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+ Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
private:
int64_t _desired_num_rows;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org