You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/12/07 00:47:42 UTC
[doris] branch master updated: [pipeline](dategen) Support datagen node (#14835)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new df2b190559 [pipeline](dategen) Support datagen node (#14835)
df2b190559 is described below
commit df2b1905594be71f2dd987b7f20369eec6ccd97f
Author: Gabriel <ga...@gmail.com>
AuthorDate: Wed Dec 7 08:47:35 2022 +0800
[pipeline](dategen) Support datagen node (#14835)
---
be/src/pipeline/CMakeLists.txt | 1 +
be/src/pipeline/exec/datagen_operator.cpp | 38 +++++++++++++++++++++
be/src/pipeline/exec/datagen_operator.h | 48 +++++++++++++++++++++++++++
be/src/pipeline/exec/operator.h | 1 +
be/src/pipeline/pipeline_fragment_context.cpp | 7 ++++
5 files changed, 95 insertions(+)
diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt
index 436d4e70f2..c3a06c9f1f 100644
--- a/be/src/pipeline/CMakeLists.txt
+++ b/be/src/pipeline/CMakeLists.txt
@@ -28,6 +28,7 @@ set(PIPELINE_FILES
task_scheduler.cpp
exec/operator.cpp
exec/scan_operator.cpp
+ exec/datagen_operator.cpp
exec/empty_set_operator.cpp
exec/exchange_source_operator.cpp
exec/exchange_sink_operator.cpp
diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp
new file mode 100644
index 0000000000..729096f803
--- /dev/null
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "datagen_operator.h"
+
+#include "vec/exec/vdata_gen_scan_node.h"
+
+namespace doris::pipeline {
+
+OPERATOR_CODE_GENERATOR(DataGenOperator, Operator)
+
+Status DataGenOperator::open(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ RETURN_IF_ERROR(Operator::open(state));
+ return _node->open(state);
+}
+
+Status DataGenOperator::close(RuntimeState* state) {
+ RETURN_IF_ERROR(Operator::close(state));
+ _node->close(state);
+ return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h
new file mode 100644
index 0000000000..9d418a0284
--- /dev/null
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "operator.h"
+
+namespace doris::vectorized {
+class VDataGenFunctionScanNode;
+} // namespace doris::vectorized
+
+namespace doris::pipeline {
+
+class DataGenOperatorBuilder : public OperatorBuilder<vectorized::VDataGenFunctionScanNode> {
+public:
+ DataGenOperatorBuilder(int32_t id, ExecNode* exec_node);
+ bool is_source() const override { return true; }
+ OperatorPtr build_operator() override;
+};
+
+class DataGenOperator : public Operator<DataGenOperatorBuilder> {
+public:
+ DataGenOperator(OperatorBuilderBase* operator_builder, ExecNode* datagen_node);
+
+ bool can_read() override { return true; };
+
+ Status open(RuntimeState* state) override;
+
+ Status close(RuntimeState* state) override;
+};
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d821103aa1..461848d54a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -23,6 +23,7 @@
#include "exec/exec_node.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
+#include "vec/exec/vdata_gen_scan_node.h"
#define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \
NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 0c2793c444..75cd7dbc3e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -23,6 +23,7 @@
#include "exec/aggregation_sink_operator.h"
#include "exec/aggregation_source_operator.h"
#include "exec/data_sink.h"
+#include "exec/datagen_operator.h"
#include "exec/empty_set_operator.h"
#include "exec/exchange_sink_operator.h"
#include "exec/exchange_source_operator.h"
@@ -298,6 +299,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
+ case TPlanNodeType::DATA_GEN_SCAN_NODE: {
+ OperatorBuilderPtr operator_t =
+ std::make_shared<DataGenOperatorBuilder>(next_operator_builder_id(), node);
+ RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
+ break;
+ }
case TPlanNodeType::AGGREGATION_NODE: {
auto* agg_node = assert_cast<vectorized::AggregationNode*>(node);
auto new_pipe = add_pipeline();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org