You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/09 07:52:58 UTC
[doris] branch master updated: [improvement](multi-catalog) Impl parallel for file scanner to improve the scanner performance (#10620)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 24d824a783 [improvement](multi-catalog) Impl parallel for file scanner to improve the scanner performance (#10620)
24d824a783 is described below
commit 24d824a78390e88a63bb448e75c0c7fd1aa5c910
Author: huangzhaowei <hu...@bytedance.com>
AuthorDate: Sat Jul 9 15:52:53 2022 +0800
[improvement](multi-catalog) Impl parallel for file scanner to improve the scanner performance (#10620)
Add multi-thread support in FileScanNode on be and impl the file spilt logic in fe.
---
be/src/vec/exec/file_arrow_scanner.cpp | 2 +-
be/src/vec/exec/file_scan_node.cpp | 31 ++++++++++------
be/src/vec/exec/file_scan_node.h | 12 +++++-
.../doris/catalog/external/HMSExternalTable.java | 4 +-
.../main/java/org/apache/doris/common/Config.java | 6 +++
.../planner/external/ExternalFileScanNode.java | 43 +++++++++++++++++++++-
.../planner/external/ExternalHudiScanProvider.java | 2 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +-
8 files changed, 81 insertions(+), 21 deletions(-)
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 9adad2d71a..88ebca2bb0 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -37,7 +37,7 @@ FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
_arrow_batch_cur_idx(0) {}
FileArrowScanner::~FileArrowScanner() {
- close();
+ FileArrowScanner::close();
}
Status FileArrowScanner::_open_next_reader() {
diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp
index 741b66dd81..ff4989a033 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -96,9 +96,14 @@ Status FileScanNode::open(RuntimeState* state) {
Status FileScanNode::start_scanners() {
{
std::unique_lock<std::mutex> l(_batch_queue_lock);
- _num_running_scanners = 1;
+ _num_running_scanners = _scan_ranges.size();
+ }
+
+ _scanners_status.resize(_scan_ranges.size());
+ for (int i = 0; i < _scan_ranges.size(); i++) {
+ _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+ std::ref(_scanners_status[i]));
}
- _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, 0, _scan_ranges.size());
return Status::OK();
}
@@ -203,7 +208,10 @@ Status FileScanNode::close(RuntimeState* state) {
for (int i = 0; i < _scanner_threads.size(); ++i) {
_scanner_threads[i].join();
}
-
+ for (int i = 0; i < _scanners_status.size(); i++) {
+ std::future<Status> f = _scanners_status[i].get_future();
+ RETURN_IF_ERROR(f.get());
+ }
// Close
_batch_queue.clear();
return ExecNode::close(state);
@@ -264,18 +272,16 @@ Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCount
return Status::OK();
}
-void FileScanNode::scanner_worker(int start_idx, int length) {
+void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
Thread::set_self_name("file_scanner");
Status status = Status::OK();
ScannerCounter counter;
- for (int i = 0; i < length && status.ok(); ++i) {
- const TFileScanRange& scan_range =
- _scan_ranges[start_idx + i].scan_range.ext_scan_range.file_scan_range;
- status = scanner_scan(scan_range, &counter);
- if (!status.ok()) {
- LOG(WARNING) << "Scanner[" << start_idx + i
- << "] process failed. status=" << status.get_error_msg();
- }
+ const TFileScanRange& scan_range =
+ _scan_ranges[start_idx].scan_range.ext_scan_range.file_scan_range;
+ status = scanner_scan(scan_range, &counter);
+ if (!status.ok()) {
+ LOG(WARNING) << "Scanner[" << start_idx
+ << "] process failed. status=" << status.get_error_msg();
}
// Update stats
@@ -296,6 +302,7 @@ void FileScanNode::scanner_worker(int start_idx, int length) {
if (!status.ok()) {
_queue_writer_cond.notify_all();
}
+ p_status.set_value(status);
}
std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range,
diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h
index 5106d654cc..2d7deb9626 100644
--- a/be/src/vec/exec/file_scan_node.h
+++ b/be/src/vec/exec/file_scan_node.h
@@ -17,7 +17,14 @@
#pragma once
-#include <memory>
+#include <atomic>
+#include <condition_variable>
+#include <future>
+#include <map>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
#include "common/status.h"
#include "exec/base_scanner.h"
@@ -77,7 +84,7 @@ private:
Status start_scanners();
- void scanner_worker(int start_idx, int length);
+ void scanner_worker(int start_idx, int length, std::promise<Status>& p_status);
// Scan one range
Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter);
@@ -99,6 +106,7 @@ private:
Status _process_status;
std::vector<std::thread> _scanner_threads;
+ std::vector<std::promise<Status>> _scanners_status;
int _max_buffered_batches;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 93231a35b6..d6fa3eb34f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -239,8 +239,8 @@ public class HMSExternalTable extends ExternalTable {
@Override
public TTableDescriptor toThrift() {
THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
- TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, fullSchema.size(), 0,
- getName(), "");
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, fullSchema.size(), 0,
+ getName(), dbName);
tTableDescriptor.setHiveTable(tHiveTable);
return tTableDescriptor;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index f411669a8c..c64de8ea6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1664,6 +1664,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = false, masterOnly = true)
public static boolean enable_multi_catalog = false; // 1 min
+ @ConfField(mutable = true, masterOnly = false)
+ public static long file_scan_node_split_size = 256 * 1024 * 1024; // 256mb
+
+ @ConfField(mutable = true, masterOnly = false)
+ public static long file_scan_node_split_num = 128;
+
/**
* If set to TRUE, FE will:
* 1. divide BE into high load and low load(no mid load) to force triggering tablet scheduling;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 3c9b5d19c1..db31d834e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -134,6 +134,30 @@ public class ExternalFileScanNode extends ExternalScanNode {
}
}
+ private static class FileSplitStrategy {
+ private long totalSplitSize;
+ private int splitNum;
+
+ FileSplitStrategy() {
+ this.totalSplitSize = 0;
+ this.splitNum = 0;
+ }
+
+ public void update(FileSplit split) {
+ totalSplitSize += split.getLength();
+ splitNum++;
+ }
+
+ public boolean hasNext() {
+ return totalSplitSize > Config.file_scan_node_split_size || splitNum > Config.file_scan_node_split_num;
+ }
+
+ public void next() {
+ totalSplitSize = 0;
+ splitNum = 0;
+ }
+ }
+
private final BackendPolicy backendPolicy = new BackendPolicy();
private final ParamCreateContext context = new ParamCreateContext();
@@ -222,6 +246,9 @@ public class ExternalFileScanNode extends ExternalScanNode {
partitionKeys.addAll(scanProvider.getPathPartitionKeys());
context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
for (SlotDescriptor slot : desc.getSlots()) {
+ if (!slot.isMaterialized()) {
+ continue;
+ }
int slotId = slotDescByName.get(slot.getColumn().getName()).getId().asInt();
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
@@ -254,11 +281,13 @@ public class ExternalFileScanNode extends ExternalScanNode {
String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
String fsName = fullPath.replace(filePath, "");
- // Todo: now every split will assign one scan range, we can merge them for optimize.
+ TScanRangeLocations curLocations = newLocations(context.params);
+
+ FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
+
for (InputSplit split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
- TScanRangeLocations curLocations = newLocations(context.params);
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
partitionKeys);
@@ -270,6 +299,15 @@ public class ExternalFileScanNode extends ExternalScanNode {
+ " with table split: " + fileSplit.getPath()
+ " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")");
+ fileSplitStrategy.update(fileSplit);
+ // Add a new location when it's can be split
+ if (fileSplitStrategy.hasNext()) {
+ scanRangeLocations.add(curLocations);
+ curLocations = newLocations(context.params);
+ fileSplitStrategy.next();
+ }
+ }
+ if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) {
scanRangeLocations.add(curLocations);
}
}
@@ -340,6 +378,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
+ LOG.debug("There is {} scanRangeLocations for execution.", scanRangeLocations.size());
return scanRangeLocations;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
index 1638d9429c..c32a2ac3d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
@@ -27,7 +27,7 @@ import java.util.List;
/**
* A file scan provider for hudi.
- * HudiProvier is extended with hive since they both use input format interface to get the spilt.
+ * HudiProvier is extended with hive since they both use input format interface to get the split.
*/
public class ExternalHudiScanProvider extends ExternalHiveScanProvider {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e9bf9ae943..fb2696d72a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1445,7 +1445,7 @@ public class Coordinator {
// we should make sure
// 1. same bucket in some address be
// 2. different scanNode id scan different scanRange which belong to the scanNode id
- // 3. split how many scanRange one instance should scan, same bucket do not spilt to different instance
+ // 3. split how many scanRange one instance should scan, same bucket do not split to different instance
Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges
= Pair.create(scanRanges.getKey(), filteredNodeScanRanges);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org