You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/09 01:08:36 UTC

[doris] branch master updated: [pipeline](refactor) do some refactor for code and comments (#14934)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b311ebef6c [pipeline](refactor) do some refactor for code and comments (#14934)
b311ebef6c is described below

commit b311ebef6cae6496ed1a6053b8f22e9845476cd6
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Dec 9 09:08:28 2022 +0800

    [pipeline](refactor) do some refactor for code and comments (#14934)
---
 be/src/pipeline/exec/hashjoin_probe_operator.cpp |   2 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h   |   2 +-
 be/src/pipeline/exec/operator.h                  | 107 +++++++++++++++--------
 be/src/pipeline/exec/repeat_operator.cpp         |   2 +-
 be/src/pipeline/exec/repeat_operator.h           |   2 +-
 be/src/pipeline/exec/table_function_operator.cpp |   2 +-
 be/src/pipeline/exec/table_function_operator.h   |   2 +-
 be/src/pipeline/task_scheduler.cpp               |   2 +-
 8 files changed, 76 insertions(+), 45 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index beddf6d655..91cef6d915 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -22,7 +22,7 @@
 namespace doris {
 namespace pipeline {
 
-OPERATOR_CODE_GENERATOR(HashJoinProbeOperator, DataStateOperator)
+OPERATOR_CODE_GENERATOR(HashJoinProbeOperator, StatefulOperator)
 
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h
index ed2abd0663..7ea3e47546 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -32,7 +32,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class HashJoinProbeOperator final : public DataStateOperator<HashJoinProbeOperatorBuilder> {
+class HashJoinProbeOperator final : public StatefulOperator<HashJoinProbeOperatorBuilder> {
 public:
     HashJoinProbeOperator(OperatorBuilderBase*, ExecNode*);
     // if exec node split to: sink, source operator. the source operator
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 9f98be1a7a..f1e29f0547 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -36,19 +36,33 @@
 
 namespace doris::pipeline {
 
-// Result of source pull data, init state is DEPEND_ON_SOURCE
+/**
+ * State of source operator.
+ *                      |------> MORE_DATA ------|
+ *                      |         ^    |         |
+ * DEPEND_ON_SOURCE ----|         |----|         |----> FINISHED
+ *    ^       |         |------------------------|
+ *    |-------|
+ */
 enum class SourceState : uint8_t {
-    DEPEND_ON_SOURCE = 0, // Operator has no more data in itself, needs to read from source.
-    MORE_DATA = 1,        // Still have data can read
+    DEPEND_ON_SOURCE = 0, // Need more data from source.
+    MORE_DATA = 1,        // Has more data to output. (e.g. RepeatNode)
     FINISHED = 2
 };
 
+/**
+ * State of sink operator.
+ *                     |------> SINK_BUSY ------|
+ *                     |         ^    |         |
+ *   SINK_IDLE --------|         |----|         |----> FINISHED
+ *   ^       |         |------------------------|
+ *   |-------|
+ */
 enum class SinkState : uint8_t {
-    SINK_IDLE = 0, // can send block to sink
-    SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
+    SINK_IDLE = 0, // Can send block to sink.
+    SINK_BUSY = 1, // Sink buffer is full, sink operator is blocked until buffer is freed.
     FINISHED = 2
 };
-////////////////       DO NOT USE THE UP State     ////////////////
 
 class OperatorBuilderBase;
 class OperatorBase;
@@ -70,10 +84,8 @@ public:
     virtual bool is_sink() const { return false; }
     virtual bool is_source() const { return false; }
 
-    // create the object used by all operator
     virtual Status prepare(RuntimeState* state);
 
-    // destory the object used by all operator
     virtual void close(RuntimeState* state);
 
     std::string get_name() const { return _name; }
@@ -131,32 +143,30 @@ public:
     explicit OperatorBase(OperatorBuilderBase* operator_builder);
     virtual ~OperatorBase() = default;
 
-    // After both sink and source need to know the cancel state.
-    // do cancel work
     bool is_sink() const;
 
     bool is_source() const;
 
-    // Only result sink and data stream sink need to impl the virtual function
     virtual Status init(const TDataSink& tsink) { return Status::OK(); };
 
-    // Do prepare some state of Operator
+    // Prepare for running. (e.g. resource allocation, etc.)
     virtual Status prepare(RuntimeState* state) = 0;
 
-    // Like ExecNode,when pipeline task first time be scheduled, can't block
-    // the pipeline should be open after dependencies is finish
-    // Eg a -> c, b-> c, after a, b pipeline finish, c pipeline should call open
-    // Now the pipeline only have one task, so the there is no performance bottleneck for the mechanism,
-    // but if one pipeline have multi task to parallel work, need to rethink the logic
-    //
-    // Each operator should call alloc_resource() to prepare resource to do data compute.
-    // if ExecNode split to sink and source operator, alloc_resource() should be called in sink operator
+    /**
+     * Allocate resources needed by this operator.
+     *
+     * This is called when current pipeline is scheduled first time.
+     * e.g. If we got three pipeline and dependencies are A -> B, B-> C, all operators' `open`
+     * method in pipeline C will be called once pipeline A and B finished.
+     *
+     * Now we have only one task per pipeline, so it has no problem,
+     * But if one pipeline have multi task running in parallel, we need to rethink this logic.
+     */
     virtual Status open(RuntimeState* state) = 0;
 
-    // Release the resource, should not block the thread
-    //
-    // Each operator should call close_self() to release resource
-    // if ExecNode split to sink and source operator, close_self() should be called in source operator
+    /**
+     * Release all resources once this operator done its work.
+     */
     virtual Status close(RuntimeState* state) = 0;
 
     Status set_child(OperatorPtr child) {
@@ -171,13 +181,19 @@ public:
 
     virtual bool can_write() { return false; } // for sink
 
-    // for pipeline
+    /**
+     * The main method to execute a pipeline task.
+     * Now it is a pull-based pipeline and operators pull data from its child by this method.
+     */
     virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
                              SourceState& result_state) {
         return Status::OK();
     };
 
-    // return can write continue
+    /**
+     * Push data to the sink operator.
+     * Data in this block will be sent by RPC or written to somewhere finally.
+     */
     virtual Status sink(RuntimeState* state, vectorized::Block* block,
                         SourceState source_state) = 0;
 
@@ -187,15 +203,16 @@ public:
         return Status::NotSupported(error_msg.str());
     }
 
-    // close be called
-    // - Source: scan thread do not exist
-    // - Sink: RPC do not be disposed
-    // - else return false
+    /**
+     * pending_finish means we have called `close` and there are still some work to do before finishing.
+     * Now it is a pull-based pipeline and operators pull data from its child by this method.
+     *
+     * For source operator, it is pending_finish iff scan threads have not been released yet
+     * For sink operator, it is pending_finish iff RPC resources have not been released yet
+     * Otherwise, it will return false.
+     */
     virtual bool is_pending_finish() const { return false; }
 
-    // TODO: should we keep the function
-    // virtual bool is_finished() = 0;
-
     bool is_closed() const { return _is_closed; }
 
     MemTracker* mem_tracker() const { return _mem_tracker.get(); }
@@ -211,8 +228,6 @@ protected:
     std::unique_ptr<MemTracker> _mem_tracker;
 
     OperatorBuilderBase* _operator_builder;
-    // source has no child
-    // if an operator is not source, it will get data from its child.
     OperatorPtr _child;
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
@@ -223,6 +238,11 @@ private:
     bool _is_closed = false;
 };
 
+/**
+ * All operators inherited from DataSinkOperator will hold a SinkNode inside. Namely, it is a one-to-one relation between DataSinkOperator and DataSink.
+ *
+ * It should be mentioned that, not all SinkOperators are inherited from this (e.g. SortSinkOperator which holds a sort node inside instead of a DataSink).
+ */
 template <typename OperatorBuilderType>
 class DataSinkOperator : public OperatorBase {
 public:
@@ -275,6 +295,9 @@ protected:
     NodeType* _sink;
 };
 
+/**
+ * All operators inherited from Operator will hold a ExecNode inside.
+ */
 template <typename OperatorBuilderType>
 class Operator : public OperatorBase {
 public:
@@ -337,18 +360,26 @@ protected:
     NodeType* _node;
 };
 
+/**
+ * StatefulOperator indicates the operators with some states inside.
+ *
+ * Specifically, we called an operator stateful if an operator can determine its output by itself.
+ * For example, hash join probe operator is a typical StatefulOperator. When it gets a block from probe side, it will hold this block inside (e.g. _child_block).
+ * If there are still remain rows in probe block, we can get output block by calling `get_block` without any data from its child.
+ * In a nutshell, it is a one-to-many relation between input blocks and output blocks for StatefulOperator.
+ */
 template <typename OperatorBuilderType>
-class DataStateOperator : public Operator<OperatorBuilderType> {
+class StatefulOperator : public Operator<OperatorBuilderType> {
 public:
     using NodeType =
             std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+    StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
             : Operator<OperatorBuilderType>(builder, node),
               _child_block(new vectorized::Block),
               _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
 
-    virtual ~DataStateOperator() = default;
+    virtual ~StatefulOperator() = default;
 
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override {
diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp
index 0af15b2a31..def1f6da9d 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -21,6 +21,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(RepeatOperator, DataStateOperator)
+OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator)
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h
index 5254a47d2a..15707ea39c 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -34,7 +34,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class RepeatOperator final : public DataStateOperator<RepeatOperatorBuilder> {
+class RepeatOperator final : public StatefulOperator<RepeatOperatorBuilder> {
 public:
     RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode* repeat_node);
 };
diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp
index be8dd0f131..8146dc2ea0 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -19,6 +19,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(TableFunctionOperator, DataStateOperator)
+OPERATOR_CODE_GENERATOR(TableFunctionOperator, StatefulOperator)
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h
index f106abab07..f2c0437101 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -29,7 +29,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class TableFunctionOperator final : public DataStateOperator<TableFunctionOperatorBuilder> {
+class TableFunctionOperator final : public StatefulOperator<TableFunctionOperatorBuilder> {
 public:
     TableFunctionOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
 };
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 051775f094..5c146cd3f5 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -211,7 +211,7 @@ void TaskScheduler::_do_work(size_t index) {
         if (!task) {
             task = queue->steal_take(index);
             if (!task) {
-                // TODO: The take is a stock method, rethink the logic
+                // TODO: The take is a blocking method, rethink the logic
                 task = queue->take(index);
                 if (!task) {
                     continue;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org