You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/12/07 00:47:42 UTC

[doris] branch master updated: [pipeline](dategen) Support datagen node (#14835)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new df2b190559 [pipeline](dategen) Support datagen node (#14835)
df2b190559 is described below

commit df2b1905594be71f2dd987b7f20369eec6ccd97f
Author: Gabriel <ga...@gmail.com>
AuthorDate: Wed Dec 7 08:47:35 2022 +0800

    [pipeline](dategen) Support datagen node (#14835)
---
 be/src/pipeline/CMakeLists.txt                |  1 +
 be/src/pipeline/exec/datagen_operator.cpp     | 38 +++++++++++++++++++++
 be/src/pipeline/exec/datagen_operator.h       | 48 +++++++++++++++++++++++++++
 be/src/pipeline/exec/operator.h               |  1 +
 be/src/pipeline/pipeline_fragment_context.cpp |  7 ++++
 5 files changed, 95 insertions(+)

diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt
index 436d4e70f2..c3a06c9f1f 100644
--- a/be/src/pipeline/CMakeLists.txt
+++ b/be/src/pipeline/CMakeLists.txt
@@ -28,6 +28,7 @@ set(PIPELINE_FILES
         task_scheduler.cpp
         exec/operator.cpp
         exec/scan_operator.cpp
+        exec/datagen_operator.cpp
         exec/empty_set_operator.cpp
         exec/exchange_source_operator.cpp
         exec/exchange_sink_operator.cpp
diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp
new file mode 100644
index 0000000000..729096f803
--- /dev/null
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "datagen_operator.h"
+
+#include "vec/exec/vdata_gen_scan_node.h"
+
+namespace doris::pipeline {
+
+OPERATOR_CODE_GENERATOR(DataGenOperator, Operator)
+
+Status DataGenOperator::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(Operator::open(state));
+    return _node->open(state);
+}
+
+Status DataGenOperator::close(RuntimeState* state) {
+    RETURN_IF_ERROR(Operator::close(state));
+    _node->close(state);
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h
new file mode 100644
index 0000000000..9d418a0284
--- /dev/null
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "operator.h"
+
+namespace doris::vectorized {
+class VDataGenFunctionScanNode;
+} // namespace doris::vectorized
+
+namespace doris::pipeline {
+
+class DataGenOperatorBuilder : public OperatorBuilder<vectorized::VDataGenFunctionScanNode> {
+public:
+    DataGenOperatorBuilder(int32_t id, ExecNode* exec_node);
+    bool is_source() const override { return true; }
+    OperatorPtr build_operator() override;
+};
+
+class DataGenOperator : public Operator<DataGenOperatorBuilder> {
+public:
+    DataGenOperator(OperatorBuilderBase* operator_builder, ExecNode* datagen_node);
+
+    bool can_read() override { return true; };
+
+    Status open(RuntimeState* state) override;
+
+    Status close(RuntimeState* state) override;
+};
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d821103aa1..461848d54a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -23,6 +23,7 @@
 #include "exec/exec_node.h"
 #include "runtime/runtime_state.h"
 #include "vec/core/block.h"
+#include "vec/exec/vdata_gen_scan_node.h"
 
 #define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS)                                                 \
     NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node)                               \
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 0c2793c444..75cd7dbc3e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -23,6 +23,7 @@
 #include "exec/aggregation_sink_operator.h"
 #include "exec/aggregation_source_operator.h"
 #include "exec/data_sink.h"
+#include "exec/datagen_operator.h"
 #include "exec/empty_set_operator.h"
 #include "exec/exchange_sink_operator.h"
 #include "exec/exchange_source_operator.h"
@@ -298,6 +299,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
         break;
     }
+    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
+        OperatorBuilderPtr operator_t =
+                std::make_shared<DataGenOperatorBuilder>(next_operator_builder_id(), node);
+        RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
+        break;
+    }
     case TPlanNodeType::AGGREGATION_NODE: {
         auto* agg_node = assert_cast<vectorized::AggregationNode*>(node);
         auto new_pipe = add_pipeline();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org