You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/09 07:52:58 UTC

[doris] branch master updated: [improvement](multi-catalog) Impl parallel for file scanner to improve the scanner performance (#10620)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 24d824a783 [improvement](multi-catalog) Impl parallel for file scanner to improve the scanner performance (#10620)
24d824a783 is described below

commit 24d824a78390e88a63bb448e75c0c7fd1aa5c910
Author: huangzhaowei <hu...@bytedance.com>
AuthorDate: Sat Jul 9 15:52:53 2022 +0800

    [improvement](multi-catalog) Impl parallel for file scanner to improve the scanner performance (#10620)
    
    Add multi-thread support in FileScanNode on be and impl the file spilt logic in fe.
---
 be/src/vec/exec/file_arrow_scanner.cpp             |  2 +-
 be/src/vec/exec/file_scan_node.cpp                 | 31 ++++++++++------
 be/src/vec/exec/file_scan_node.h                   | 12 +++++-
 .../doris/catalog/external/HMSExternalTable.java   |  4 +-
 .../main/java/org/apache/doris/common/Config.java  |  6 +++
 .../planner/external/ExternalFileScanNode.java     | 43 +++++++++++++++++++++-
 .../planner/external/ExternalHudiScanProvider.java |  2 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  2 +-
 8 files changed, 81 insertions(+), 21 deletions(-)

diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 9adad2d71a..88ebca2bb0 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -37,7 +37,7 @@ FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
           _arrow_batch_cur_idx(0) {}
 
 FileArrowScanner::~FileArrowScanner() {
-    close();
+    FileArrowScanner::close();
 }
 
 Status FileArrowScanner::_open_next_reader() {
diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp
index 741b66dd81..ff4989a033 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -96,9 +96,14 @@ Status FileScanNode::open(RuntimeState* state) {
 Status FileScanNode::start_scanners() {
     {
         std::unique_lock<std::mutex> l(_batch_queue_lock);
-        _num_running_scanners = 1;
+        _num_running_scanners = _scan_ranges.size();
+    }
+
+    _scanners_status.resize(_scan_ranges.size());
+    for (int i = 0; i < _scan_ranges.size(); i++) {
+        _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+                                      std::ref(_scanners_status[i]));
     }
-    _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, 0, _scan_ranges.size());
     return Status::OK();
 }
 
@@ -203,7 +208,10 @@ Status FileScanNode::close(RuntimeState* state) {
     for (int i = 0; i < _scanner_threads.size(); ++i) {
         _scanner_threads[i].join();
     }
-
+    for (int i = 0; i < _scanners_status.size(); i++) {
+        std::future<Status> f = _scanners_status[i].get_future();
+        RETURN_IF_ERROR(f.get());
+    }
     // Close
     _batch_queue.clear();
     return ExecNode::close(state);
@@ -264,18 +272,16 @@ Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCount
     return Status::OK();
 }
 
-void FileScanNode::scanner_worker(int start_idx, int length) {
+void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
     Thread::set_self_name("file_scanner");
     Status status = Status::OK();
     ScannerCounter counter;
-    for (int i = 0; i < length && status.ok(); ++i) {
-        const TFileScanRange& scan_range =
-                _scan_ranges[start_idx + i].scan_range.ext_scan_range.file_scan_range;
-        status = scanner_scan(scan_range, &counter);
-        if (!status.ok()) {
-            LOG(WARNING) << "Scanner[" << start_idx + i
-                         << "] process failed. status=" << status.get_error_msg();
-        }
+    const TFileScanRange& scan_range =
+            _scan_ranges[start_idx].scan_range.ext_scan_range.file_scan_range;
+    status = scanner_scan(scan_range, &counter);
+    if (!status.ok()) {
+        LOG(WARNING) << "Scanner[" << start_idx
+                     << "] process failed. status=" << status.get_error_msg();
     }
 
     // Update stats
@@ -296,6 +302,7 @@ void FileScanNode::scanner_worker(int start_idx, int length) {
     if (!status.ok()) {
         _queue_writer_cond.notify_all();
     }
+    p_status.set_value(status);
 }
 
 std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range,
diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h
index 5106d654cc..2d7deb9626 100644
--- a/be/src/vec/exec/file_scan_node.h
+++ b/be/src/vec/exec/file_scan_node.h
@@ -17,7 +17,14 @@
 
 #pragma once
 
-#include <memory>
+#include <atomic>
+#include <condition_variable>
+#include <future>
+#include <map>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
 
 #include "common/status.h"
 #include "exec/base_scanner.h"
@@ -77,7 +84,7 @@ private:
 
     Status start_scanners();
 
-    void scanner_worker(int start_idx, int length);
+    void scanner_worker(int start_idx, int length, std::promise<Status>& p_status);
     // Scan one range
     Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter);
 
@@ -99,6 +106,7 @@ private:
     Status _process_status;
 
     std::vector<std::thread> _scanner_threads;
+    std::vector<std::promise<Status>> _scanners_status;
 
     int _max_buffered_batches;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 93231a35b6..d6fa3eb34f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -239,8 +239,8 @@ public class HMSExternalTable extends ExternalTable {
     @Override
     public TTableDescriptor toThrift() {
         THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
-        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, fullSchema.size(), 0,
-                getName(), "");
+        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, fullSchema.size(), 0,
+                getName(), dbName);
         tTableDescriptor.setHiveTable(tHiveTable);
         return tTableDescriptor;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index f411669a8c..c64de8ea6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1664,6 +1664,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false, masterOnly = true)
     public static boolean enable_multi_catalog = false; // 1 min
 
+    @ConfField(mutable = true, masterOnly = false)
+    public static long file_scan_node_split_size = 256 * 1024 * 1024; // 256mb
+
+    @ConfField(mutable = true, masterOnly = false)
+    public static long file_scan_node_split_num = 128;
+
     /**
      * If set to TRUE, FE will:
      * 1. divide BE into high load and low load(no mid load) to force triggering tablet scheduling;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 3c9b5d19c1..db31d834e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -134,6 +134,30 @@ public class ExternalFileScanNode extends ExternalScanNode {
         }
     }
 
+    private static class FileSplitStrategy {
+        private long totalSplitSize;
+        private int splitNum;
+
+        FileSplitStrategy() {
+            this.totalSplitSize = 0;
+            this.splitNum = 0;
+        }
+
+        public void update(FileSplit split) {
+            totalSplitSize += split.getLength();
+            splitNum++;
+        }
+
+        public boolean hasNext() {
+            return totalSplitSize > Config.file_scan_node_split_size || splitNum > Config.file_scan_node_split_num;
+        }
+
+        public void next() {
+            totalSplitSize = 0;
+            splitNum = 0;
+        }
+    }
+
     private final BackendPolicy backendPolicy = new BackendPolicy();
 
     private final ParamCreateContext context = new ParamCreateContext();
@@ -222,6 +246,9 @@ public class ExternalFileScanNode extends ExternalScanNode {
         partitionKeys.addAll(scanProvider.getPathPartitionKeys());
         context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
         for (SlotDescriptor slot : desc.getSlots()) {
+            if (!slot.isMaterialized()) {
+                continue;
+            }
             int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt();
 
             TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
@@ -254,11 +281,13 @@ public class ExternalFileScanNode extends ExternalScanNode {
         String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
 
-        // Todo: now every split will assign one scan range, we can merge them for optimize.
+        TScanRangeLocations curLocations = newLocations(context.params);
+
+        FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
+
         for (InputSplit split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
 
-            TScanRangeLocations curLocations = newLocations(context.params);
             List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
                     partitionKeys);
 
@@ -270,6 +299,15 @@ public class ExternalFileScanNode extends ExternalScanNode {
                     + " with table split: " +  fileSplit.getPath()
                     + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")");
 
+            fileSplitStrategy.update(fileSplit);
+            // Add a new location when it's can be split
+            if (fileSplitStrategy.hasNext()) {
+                scanRangeLocations.add(curLocations);
+                curLocations = newLocations(context.params);
+                fileSplitStrategy.next();
+            }
+        }
+        if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) {
             scanRangeLocations.add(curLocations);
         }
     }
@@ -340,6 +378,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
 
     @Override
     public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
+        LOG.debug("There is {} scanRangeLocations for execution.", scanRangeLocations.size());
         return scanRangeLocations;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
index 1638d9429c..c32a2ac3d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
@@ -27,7 +27,7 @@ import java.util.List;
 
 /**
  * A file scan provider for hudi.
- * HudiProvier is extended with hive since they both use input format interface to get the spilt.
+ * HudiProvier is extended with hive since they both use input format interface to get the split.
  */
 public class ExternalHudiScanProvider extends ExternalHiveScanProvider {
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e9bf9ae943..fb2696d72a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1445,7 +1445,7 @@ public class Coordinator {
             // we should make sure
             // 1. same bucket in some address be
             // 2. different scanNode id scan different scanRange which belong to the scanNode id
-            // 3. split how many scanRange one instance should scan, same bucket do not spilt to different instance
+            // 3. split how many scanRange one instance should scan, same bucket do not split to different instance
             Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges
                     = Pair.create(scanRanges.getKey(), filteredNodeScanRanges);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org