You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/03/05 05:14:53 UTC

[GitHub] [incubator-doris] stdpain commented on a change in pull request #5468: [Enhancement] Support Pallralel Merge In Exchange Node

stdpain commented on a change in pull request #5468:
URL: https://github.com/apache/incubator-doris/pull/5468#discussion_r588037230



##########
File path: be/src/runtime/sorted_run_merger.cc
##########
@@ -99,6 +102,110 @@ class SortedRunMerger::BatchedRowSupplier {
     SortedRunMerger* _parent;
 };
 
+class SortedRunMerger::ParallelBatchedRowSupplier : public SortedRunMerger::BatchedRowSupplier {
+public:
+    // Construct an instance from a sorted input run.
+    ParallelBatchedRowSupplier(SortedRunMerger* parent, const RunBatchSupplier& sorted_run)
+            : BatchedRowSupplier(parent, sorted_run) {}
+
+    ~ParallelBatchedRowSupplier() {
+        // when have the limit clause need to wait the _pull_task_thread join terminate
+        _cancel = true;
+        _backup_ready = false;
+        _batch_prepared_cv.notify_one();
+        _pull_task_thread.join();
+
+        delete _input_row_batch;
+        delete _input_row_batch_backup;
+    }
+
+    // Retrieves the first batch of sorted rows from the run.
+    Status init(bool* done) override {
+        *done = false;
+        _pull_task_thread = std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, this);
+
+        RETURN_IF_ERROR(next(NULL, done));
+        return Status::OK();
+    }
+
+    // Increment the current row index. If the current input batch is exhausted fetch the
+    // next one from the sorted run. Transfer ownership to transfer_batch if not NULL.
+    Status next(RowBatch* transfer_batch, bool* done) override {
+        ++_input_row_batch_index;
+        if (_input_row_batch && _input_row_batch_index < _input_row_batch->num_rows()) {
+            *done = false;
+        } else {
+            ScopedTimer<MonotonicStopWatch> timer(_parent->_get_next_batch_timer);
+            if (_input_row_batch && transfer_batch != NULL) {
+                _input_row_batch->transfer_resource_ownership(transfer_batch);
+            }
+            // release the mem of child merge
+            delete _input_row_batch;
+
+			std::unique_lock<std::mutex> lock(_mutex);
+            _batch_prepared_cv.wait(lock, [this](){ return _backup_ready.load(); });
+
+            // switch input_row_batch_backup to _input_row_batch
+            _input_row_batch = _input_row_batch_backup;
+			_input_row_batch_index = 0;
+            _input_row_batch_backup = nullptr;
+            _backup_ready = false;
+            DCHECK(_input_row_batch == nullptr || _input_row_batch->num_rows() > 0);
+
+            *done = _input_row_batch == nullptr;
+            _batch_prepared_cv.notify_one();
+        }
+        return Status::OK();
+    }
+
+private:
+    // The backup row batch input be backup batch from _sort_run.
+    RowBatch* _input_row_batch_backup;
+
+    std::atomic_bool _backup_ready{false};
+
+    std::atomic_bool _cancel{false};
+
+    std::thread _pull_task_thread;
+
+    Status _status_backup;
+
+    std::mutex _mutex;
+
+    // signal of new batch or the eos/cancelled condition
+    std::condition_variable _batch_prepared_cv;
+
+    struct NotifyCondition {
+    	explicit NotifyCondition(std::condition_variable& batch_prepared_cv) :
+    		_condition(batch_prepared_cv) {}
+    	~NotifyCondition() {
+    		_condition.notify_one();
+    	}
+    	std::condition_variable& _condition;
+    };
+
+    void process_sorted_run_task() {
+        std::unique_lock<std::mutex> lock(_mutex);
+        while (true) {
+            _batch_prepared_cv.wait(lock, [this]() { return !_backup_ready.load(); });
+            if (_cancel) {
+                break;
+            }
+
+            // do merge from sender queue data
+            _status_backup = _sorted_run(&_input_row_batch_backup);
+            _backup_ready = true;
+            std::unique_ptr<NotifyCondition> notify_condition(

Review comment:
       may be we could use defer_op  instead of this ?

##########
File path: be/src/runtime/sorted_run_merger.cc
##########
@@ -155,8 +263,17 @@ Status SortedRunMerger::prepare(const vector<RunBatchSupplier>& input_runs) {
     return Status::OK();
 }
 
+void SortedRunMerger::transfer_all_resources(class doris::RowBatch * transfer_resource_batch) {
+    for (BatchedRowSupplier* batched_row_supplier : _min_heap) {
+        auto row_batch = batched_row_supplier->get_row_batch();
+        if (row_batch != nullptr) {
+            row_batch->transfer_resource_ownership(transfer_resource_batch);
+        }
+    }
+}
+
 Status SortedRunMerger::get_next(RowBatch* output_batch, bool* eos) {
-    ScopedTimer<MonotonicStopWatch> timer(_get_next_timer);
+	ScopedTimer<MonotonicStopWatch> timer(_get_next_timer);

Review comment:
       code-format here

##########
File path: be/src/runtime/sorted_run_merger.cc
##########
@@ -129,16 +236,17 @@ SortedRunMerger::SortedRunMerger(const TupleRowComparator& compare_less_than,
                                  bool deep_copy_input)
         : _compare_less_than(compare_less_than),
           _input_row_desc(row_desc),
-          _deep_copy_input(deep_copy_input) {
-    _get_next_timer = ADD_TIMER(profile, "MergeGetNext");
-    _get_next_batch_timer = ADD_TIMER(profile, "MergeGetNextBatch");
+		  _deep_copy_input(deep_copy_input) {

Review comment:
       need a code format




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org