You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/09 01:08:36 UTC
[doris] branch master updated: [pipeline](refactor) do some refactor for code and comments (#14934)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b311ebef6c [pipeline](refactor) do some refactor for code and comments (#14934)
b311ebef6c is described below
commit b311ebef6cae6496ed1a6053b8f22e9845476cd6
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Dec 9 09:08:28 2022 +0800
[pipeline](refactor) do some refactor for code and comments (#14934)
---
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 2 +-
be/src/pipeline/exec/hashjoin_probe_operator.h | 2 +-
be/src/pipeline/exec/operator.h | 107 +++++++++++++++--------
be/src/pipeline/exec/repeat_operator.cpp | 2 +-
be/src/pipeline/exec/repeat_operator.h | 2 +-
be/src/pipeline/exec/table_function_operator.cpp | 2 +-
be/src/pipeline/exec/table_function_operator.h | 2 +-
be/src/pipeline/task_scheduler.cpp | 2 +-
8 files changed, 76 insertions(+), 45 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index beddf6d655..91cef6d915 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -22,7 +22,7 @@
namespace doris {
namespace pipeline {
-OPERATOR_CODE_GENERATOR(HashJoinProbeOperator, DataStateOperator)
+OPERATOR_CODE_GENERATOR(HashJoinProbeOperator, StatefulOperator)
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h
index ed2abd0663..7ea3e47546 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -32,7 +32,7 @@ public:
OperatorPtr build_operator() override;
};
-class HashJoinProbeOperator final : public DataStateOperator<HashJoinProbeOperatorBuilder> {
+class HashJoinProbeOperator final : public StatefulOperator<HashJoinProbeOperatorBuilder> {
public:
HashJoinProbeOperator(OperatorBuilderBase*, ExecNode*);
// if exec node split to: sink, source operator. the source operator
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 9f98be1a7a..f1e29f0547 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -36,19 +36,33 @@
namespace doris::pipeline {
-// Result of source pull data, init state is DEPEND_ON_SOURCE
+/**
+ * State of source operator.
+ * |------> MORE_DATA ------|
+ * | ^ | |
+ * DEPEND_ON_SOURCE ----| |----| |----> FINISHED
+ * ^ | |------------------------|
+ * |-------|
+ */
enum class SourceState : uint8_t {
- DEPEND_ON_SOURCE = 0, // Operator has no more data in itself, needs to read from source.
- MORE_DATA = 1, // Still have data can read
+ DEPEND_ON_SOURCE = 0, // Need more data from source.
+ MORE_DATA = 1, // Has more data to output. (e.g. RepeatNode)
FINISHED = 2
};
+/**
+ * State of sink operator.
+ * |------> SINK_BUSY ------|
+ * | ^ | |
+ * SINK_IDLE --------| |----| |----> FINISHED
+ * ^ | |------------------------|
+ * |-------|
+ */
enum class SinkState : uint8_t {
- SINK_IDLE = 0, // can send block to sink
- SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
+ SINK_IDLE = 0, // Can send block to sink.
+ SINK_BUSY = 1, // Sink buffer is full, sink operator is blocked until buffer is freed.
FINISHED = 2
};
-//////////////// DO NOT USE THE UP State ////////////////
class OperatorBuilderBase;
class OperatorBase;
@@ -70,10 +84,8 @@ public:
virtual bool is_sink() const { return false; }
virtual bool is_source() const { return false; }
- // create the object used by all operator
virtual Status prepare(RuntimeState* state);
- // destory the object used by all operator
virtual void close(RuntimeState* state);
std::string get_name() const { return _name; }
@@ -131,32 +143,30 @@ public:
explicit OperatorBase(OperatorBuilderBase* operator_builder);
virtual ~OperatorBase() = default;
- // After both sink and source need to know the cancel state.
- // do cancel work
bool is_sink() const;
bool is_source() const;
- // Only result sink and data stream sink need to impl the virtual function
virtual Status init(const TDataSink& tsink) { return Status::OK(); };
- // Do prepare some state of Operator
+ // Prepare for running. (e.g. resource allocation, etc.)
virtual Status prepare(RuntimeState* state) = 0;
- // Like ExecNode,when pipeline task first time be scheduled, can't block
- // the pipeline should be open after dependencies is finish
- // Eg a -> c, b-> c, after a, b pipeline finish, c pipeline should call open
- // Now the pipeline only have one task, so the there is no performance bottleneck for the mechanism,
- // but if one pipeline have multi task to parallel work, need to rethink the logic
- //
- // Each operator should call alloc_resource() to prepare resource to do data compute.
- // if ExecNode split to sink and source operator, alloc_resource() should be called in sink operator
+ /**
+ * Allocate resources needed by this operator.
+ *
+ * This is called when current pipeline is scheduled first time.
+ * e.g. If we got three pipeline and dependencies are A -> B, B-> C, all operators' `open`
+ * method in pipeline C will be called once pipeline A and B finished.
+ *
+ * Now we have only one task per pipeline, so it has no problem,
+ * But if one pipeline have multi task running in parallel, we need to rethink this logic.
+ */
virtual Status open(RuntimeState* state) = 0;
- // Release the resource, should not block the thread
- //
- // Each operator should call close_self() to release resource
- // if ExecNode split to sink and source operator, close_self() should be called in source operator
+ /**
+ * Release all resources once this operator done its work.
+ */
virtual Status close(RuntimeState* state) = 0;
Status set_child(OperatorPtr child) {
@@ -171,13 +181,19 @@ public:
virtual bool can_write() { return false; } // for sink
- // for pipeline
+ /**
+ * The main method to execute a pipeline task.
+ * Now it is a pull-based pipeline and operators pull data from its child by this method.
+ */
virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
SourceState& result_state) {
return Status::OK();
};
- // return can write continue
+ /**
+ * Push data to the sink operator.
+ * Data in this block will be sent by RPC or written to somewhere finally.
+ */
virtual Status sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) = 0;
@@ -187,15 +203,16 @@ public:
return Status::NotSupported(error_msg.str());
}
- // close be called
- // - Source: scan thread do not exist
- // - Sink: RPC do not be disposed
- // - else return false
+ /**
+ * pending_finish means we have called `close` and there are still some work to do before finishing.
+ * Now it is a pull-based pipeline and operators pull data from its child by this method.
+ *
+ * For source operator, it is pending_finish iff scan threads have not been released yet
+ * For sink operator, it is pending_finish iff RPC resources have not been released yet
+ * Otherwise, it will return false.
+ */
virtual bool is_pending_finish() const { return false; }
- // TODO: should we keep the function
- // virtual bool is_finished() = 0;
-
bool is_closed() const { return _is_closed; }
MemTracker* mem_tracker() const { return _mem_tracker.get(); }
@@ -211,8 +228,6 @@ protected:
std::unique_ptr<MemTracker> _mem_tracker;
OperatorBuilderBase* _operator_builder;
- // source has no child
- // if an operator is not source, it will get data from its child.
OperatorPtr _child;
std::unique_ptr<RuntimeProfile> _runtime_profile;
@@ -223,6 +238,11 @@ private:
bool _is_closed = false;
};
+/**
+ * All operators inherited from DataSinkOperator will hold a SinkNode inside. Namely, it is a one-to-one relation between DataSinkOperator and DataSink.
+ *
+ * It should be mentioned that, not all SinkOperators are inherited from this (e.g. SortSinkOperator which holds a sort node inside instead of a DataSink).
+ */
template <typename OperatorBuilderType>
class DataSinkOperator : public OperatorBase {
public:
@@ -275,6 +295,9 @@ protected:
NodeType* _sink;
};
+/**
+ * All operators inherited from Operator will hold a ExecNode inside.
+ */
template <typename OperatorBuilderType>
class Operator : public OperatorBase {
public:
@@ -337,18 +360,26 @@ protected:
NodeType* _node;
};
+/**
+ * StatefulOperator indicates the operators with some states inside.
+ *
+ * Specifically, we called an operator stateful if an operator can determine its output by itself.
+ * For example, hash join probe operator is a typical StatefulOperator. When it gets a block from probe side, it will hold this block inside (e.g. _child_block).
+ * If there are still remain rows in probe block, we can get output block by calling `get_block` without any data from its child.
+ * In a nutshell, it is a one-to-many relation between input blocks and output blocks for StatefulOperator.
+ */
template <typename OperatorBuilderType>
-class DataStateOperator : public Operator<OperatorBuilderType> {
+class StatefulOperator : public Operator<OperatorBuilderType> {
public:
using NodeType =
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
- DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+ StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
: Operator<OperatorBuilderType>(builder, node),
_child_block(new vectorized::Block),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {};
- virtual ~DataStateOperator() = default;
+ virtual ~StatefulOperator() = default;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp
index 0af15b2a31..def1f6da9d 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -21,6 +21,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(RepeatOperator, DataStateOperator)
+OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator)
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h
index 5254a47d2a..15707ea39c 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -34,7 +34,7 @@ public:
OperatorPtr build_operator() override;
};
-class RepeatOperator final : public DataStateOperator<RepeatOperatorBuilder> {
+class RepeatOperator final : public StatefulOperator<RepeatOperatorBuilder> {
public:
RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode* repeat_node);
};
diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp
index be8dd0f131..8146dc2ea0 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -19,6 +19,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(TableFunctionOperator, DataStateOperator)
+OPERATOR_CODE_GENERATOR(TableFunctionOperator, StatefulOperator)
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h
index f106abab07..f2c0437101 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -29,7 +29,7 @@ public:
OperatorPtr build_operator() override;
};
-class TableFunctionOperator final : public DataStateOperator<TableFunctionOperatorBuilder> {
+class TableFunctionOperator final : public StatefulOperator<TableFunctionOperatorBuilder> {
public:
TableFunctionOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
};
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 051775f094..5c146cd3f5 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -211,7 +211,7 @@ void TaskScheduler::_do_work(size_t index) {
if (!task) {
task = queue->steal_take(index);
if (!task) {
- // TODO: The take is a stock method, rethink the logic
+ // TODO: The take is a blocking method, rethink the logic
task = queue->take(index);
if (!task) {
continue;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org