You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/09 01:39:37 UTC

[doris] branch master updated: [vectorized](pipeline) support assert num rows operator (#14923)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 20f2abb3d4 [vectorized](pipeline) support assert num rows operator (#14923)
20f2abb3d4 is described below

commit 20f2abb3d4ce5cd1a013eef3c8dec6835a757a4b
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Fri Dec 9 09:39:29 2022 +0800

    [vectorized](pipeline) support assert num rows operator (#14923)
---
 .../exec/assert_num_rows_operator.h}               | 40 ++++++++++++----------
 be/src/pipeline/pipeline_fragment_context.cpp      |  9 +++++
 be/src/vec/exec/vassert_num_rows_node.cpp          | 19 +++++-----
 be/src/vec/exec/vassert_num_rows_node.h            |  6 ++--
 4 files changed, 46 insertions(+), 28 deletions(-)

diff --git a/be/src/vec/exec/vassert_num_rows_node.h b/be/src/pipeline/exec/assert_num_rows_operator.h
similarity index 50%
copy from be/src/vec/exec/vassert_num_rows_node.h
copy to be/src/pipeline/exec/assert_num_rows_operator.h
index cce963f25f..60bbc7c49c 100644
--- a/be/src/vec/exec/vassert_num_rows_node.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -15,28 +15,32 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/exec_node.h"
-#include "gen_cpp/PlanNodes_types.h"
+#pragma once
 
-namespace doris::vectorized {
-class Block;
+#include "operator.h"
+#include "vec/exec/vassert_num_rows_node.h"
 
-// Node for assert row count
-class VAssertNumRowsNode : public ExecNode {
-public:
-    VAssertNumRowsNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+namespace doris {
+
+namespace pipeline {
 
-    Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
-        return Status::NotSupported("Not Implemented VAnalyticEvalNode::get_next.");
-    }
+class AssertNumRowsOperatorBuilder final : public OperatorBuilder<vectorized::VAssertNumRowsNode> {
+public:
+    AssertNumRowsOperatorBuilder(int32_t id, ExecNode* node)
+            : OperatorBuilder(id, "AssertNumRowsOperatorBuilder", node) {};
 
-    virtual Status open(RuntimeState* state) override;
-    virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+    OperatorPtr build_operator() override;
+};
 
-private:
-    int64_t _desired_num_rows;
-    const std::string _subquery_string;
-    TAssertion::type _assertion;
+class AssertNumRowsOperator final : public Operator<AssertNumRowsOperatorBuilder> {
+public:
+    AssertNumRowsOperator(OperatorBuilderBase* operator_builder, ExecNode* node)
+            : Operator(operator_builder, node) {};
 };
 
-} // namespace doris::vectorized
+OperatorPtr AssertNumRowsOperatorBuilder::build_operator() {
+    return std::make_shared<AssertNumRowsOperator>(this, _node);
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 68ca3a8d7c..2f497dde41 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -39,7 +39,9 @@
 #include "exec/streaming_aggregation_source_operator.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/HeartbeatService_types.h"
+#include "pipeline/exec/assert_num_rows_operator.h"
 #include "pipeline/exec/olap_table_sink_operator.h"
+#include "pipeline/exec/operator.h"
 #include "pipeline/exec/table_function_operator.h"
 #include "pipeline_task.h"
 #include "runtime/client_cache.h"
@@ -351,6 +353,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         RETURN_IF_ERROR(cur_pipe->add_operator(builder));
         break;
     }
+    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
+        RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
+        OperatorBuilderPtr builder =
+                std::make_shared<AssertNumRowsOperatorBuilder>(next_operator_builder_id(), node);
+        RETURN_IF_ERROR(cur_pipe->add_operator(builder));
+        break;
+    }
     case TPlanNodeType::TABLE_FUNCTION_NODE: {
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
         OperatorBuilderPtr builder =
diff --git a/be/src/vec/exec/vassert_num_rows_node.cpp b/be/src/vec/exec/vassert_num_rows_node.cpp
index b9239a3c53..0285acf56a 100644
--- a/be/src/vec/exec/vassert_num_rows_node.cpp
+++ b/be/src/vec/exec/vassert_num_rows_node.cpp
@@ -18,8 +18,6 @@
 #include "vec/exec/vassert_num_rows_node.h"
 
 #include "gen_cpp/PlanNodes_types.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
@@ -47,12 +45,7 @@ Status VAssertNumRowsNode::open(RuntimeState* state) {
     return Status::OK();
 }
 
-Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) {
-    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
-                                 "VAssertNumRowsNode::get_next");
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos),
-                                   child(0)->get_next_span(), *eos);
+Status VAssertNumRowsNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) {
     _num_rows_returned += block->rows();
     bool assert_res = false;
     switch (_assertion) {
@@ -98,4 +91,14 @@ Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos
     return Status::OK();
 }
 
+Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
+                                 "VAssertNumRowsNode::get_next");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos),
+                                   child(0)->get_next_span(), *eos);
+
+    return pull(state, block, eos);
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vassert_num_rows_node.h b/be/src/vec/exec/vassert_num_rows_node.h
index cce963f25f..0f6ffcb9de 100644
--- a/be/src/vec/exec/vassert_num_rows_node.h
+++ b/be/src/vec/exec/vassert_num_rows_node.h
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "common/status.h"
 #include "exec/exec_node.h"
 #include "gen_cpp/PlanNodes_types.h"
 
@@ -30,8 +31,9 @@ public:
         return Status::NotSupported("Not Implemented VAnalyticEvalNode::get_next.");
     }
 
-    virtual Status open(RuntimeState* state) override;
-    virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+    Status open(RuntimeState* state) override;
+    Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+    Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
 
 private:
     int64_t _desired_num_rows;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org