You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by as...@apache.org on 2024/04/09 03:51:11 UTC

(doris) 01/01: Revert "[opt](scan) read scan ranges in the order of partitions (#31630)"

This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch revert-31630-opt_partition
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5240865fd7101bdc6f82a7c65993757355c49977
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Tue Apr 9 11:51:01 2024 +0800

    Revert "[opt](scan) read scan ranges in the order of partitions (#31630)"
    
    This reverts commit 5d99dffe6f1a3fcb107ce56181aeff96ef222def.
---
 be/src/pipeline/exec/file_scan_operator.cpp | 55 ++++++-----------------------
 be/src/vec/exec/scan/new_file_scan_node.cpp | 55 ++++++-----------------------
 2 files changed, 22 insertions(+), 88 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp
index 9d48fce2552..ac193147dfb 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -73,53 +73,20 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
         _scan_ranges = scan_ranges;
     } else {
         // There is no need for the number of scanners to exceed the number of threads in thread pool.
-        _scan_ranges.resize(max_scanners);
-        std::vector<TScanRangeParams>& scan_ranges_ =
-                const_cast<std::vector<TScanRangeParams>&>(scan_ranges);
-        auto& first_ranges = scan_ranges_[0].scan_range.ext_scan_range.file_scan_range.ranges;
-        if (first_ranges[0].__isset.columns_from_path_keys &&
-            !first_ranges[0].columns_from_path_keys.empty()) {
-            int num_keys = first_ranges[0].columns_from_path_keys.size();
-            // In the insert statement, reading data in partition order can reduce the memory usage of BE
-            // and prevent the generation of smaller tables.
-            std::sort(scan_ranges_.begin(), scan_ranges_.end(),
-                      [&num_keys](TScanRangeParams r1, TScanRangeParams r2) {
-                          auto& path1 = r1.scan_range.ext_scan_range.file_scan_range.ranges[0]
-                                                .columns_from_path;
-                          auto& path2 = r2.scan_range.ext_scan_range.file_scan_range.ranges[0]
-                                                .columns_from_path;
-                          for (int i = 0; i < num_keys; ++i) {
-                              if (path1[i] < path2[i]) {
-                                  return true;
-                              }
-                          }
-                          return false;
-                      });
+        _scan_ranges.clear();
+        auto range_iter = scan_ranges.begin();
+        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
+            _scan_ranges.push_back(*range_iter);
         }
-        int num_ranges = scan_ranges.size() / max_scanners;
-        int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
-        int scan_index = 0;
-        int range_index = 0;
-        for (int i = 0; i < num_add_one; ++i) {
-            _scan_ranges[scan_index] = scan_ranges_[range_index++];
-            auto& ranges =
-                    _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
-            for (int j = 0; j < num_ranges; j++) {
-                auto& merged_ranges = scan_ranges_[range_index++]
-                                              .scan_range.ext_scan_range.file_scan_range.ranges;
-                ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
-            }
-        }
-        for (int i = num_add_one; i < max_scanners; ++i) {
-            _scan_ranges[scan_index] = scan_ranges_[range_index++];
-            auto& ranges =
-                    _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
-            for (int j = 0; j < num_ranges - 1; j++) {
-                auto& merged_ranges = scan_ranges_[range_index++]
-                                              .scan_range.ext_scan_range.file_scan_range.ranges;
-                ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
+        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
+            if (i == max_scanners) {
+                i = 0;
             }
+            auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
+            auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
+            ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
         }
+        _scan_ranges.shrink_to_fit();
         LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
     }
     if (scan_ranges.size() > 0 &&
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp
index a0ae03a8647..2ce80f4463a 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -71,53 +71,20 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
         _scan_ranges = scan_ranges;
     } else {
         // There is no need for the number of scanners to exceed the number of threads in thread pool.
-        _scan_ranges.resize(max_scanners);
-        std::vector<TScanRangeParams>& scan_ranges_ =
-                const_cast<std::vector<TScanRangeParams>&>(scan_ranges);
-        auto& first_ranges = scan_ranges_[0].scan_range.ext_scan_range.file_scan_range.ranges;
-        if (first_ranges[0].__isset.columns_from_path_keys &&
-            !first_ranges[0].columns_from_path_keys.empty()) {
-            int num_keys = first_ranges[0].columns_from_path_keys.size();
-            // In the insert statement, reading data in partition order can reduce the memory usage of BE
-            // and prevent the generation of smaller tables.
-            std::sort(scan_ranges_.begin(), scan_ranges_.end(),
-                      [&num_keys](TScanRangeParams r1, TScanRangeParams r2) {
-                          auto& path1 = r1.scan_range.ext_scan_range.file_scan_range.ranges[0]
-                                                .columns_from_path;
-                          auto& path2 = r2.scan_range.ext_scan_range.file_scan_range.ranges[0]
-                                                .columns_from_path;
-                          for (int i = 0; i < num_keys; ++i) {
-                              if (path1[i] < path2[i]) {
-                                  return true;
-                              }
-                          }
-                          return false;
-                      });
+        _scan_ranges.clear();
+        auto range_iter = scan_ranges.begin();
+        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
+            _scan_ranges.push_back(*range_iter);
         }
-        int num_ranges = scan_ranges.size() / max_scanners;
-        int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
-        int scan_index = 0;
-        int range_index = 0;
-        for (int i = 0; i < num_add_one; ++i) {
-            _scan_ranges[scan_index] = scan_ranges_[range_index++];
-            auto& ranges =
-                    _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
-            for (int j = 0; j < num_ranges; j++) {
-                auto& merged_ranges = scan_ranges_[range_index++]
-                                              .scan_range.ext_scan_range.file_scan_range.ranges;
-                ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
-            }
-        }
-        for (int i = num_add_one; i < max_scanners; ++i) {
-            _scan_ranges[scan_index] = scan_ranges_[range_index++];
-            auto& ranges =
-                    _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
-            for (int j = 0; j < num_ranges - 1; j++) {
-                auto& merged_ranges = scan_ranges_[range_index++]
-                                              .scan_range.ext_scan_range.file_scan_range.ranges;
-                ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
+        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
+            if (i == max_scanners) {
+                i = 0;
             }
+            auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
+            auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
+            ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
         }
+        _scan_ranges.shrink_to_fit();
         LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
     }
     if (scan_ranges.size() > 0 &&


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org