You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/20 03:20:28 UTC
[doris] branch master updated: [opt](hudi) using native reader to read the base file with no log file (#20988)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 923f7edad0 [opt](hudi) using native reader to read the base file with no log file (#20988)
923f7edad0 is described below
commit 923f7edad081d77c17279e7aa327ca2c7984fdb2
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Tue Jun 20 11:20:21 2023 +0800
[opt](hudi) using native reader to read the base file with no log file (#20988)
Two optimizations:
1. Insert string bytes directly to remove decoding&encoding process.
2. Use native reader to read the hudi base file if it has no log file. Use `explain` to show how many splits are read natively.
---
be/src/vec/exec/scan/vfile_scanner.cpp | 18 +++++++++++++-----
fe/be-java-extensions/hudi-scanner/pom.xml | 11 ++++++++++-
.../java/org/apache/doris/hudi/HudiColumnValue.java | 18 ++++++++++++++++++
.../org/apache/doris/common/jni/MockJniScanner.java | 5 +++++
.../org/apache/doris/common/jni/vec/ColumnValue.java | 3 +++
.../org/apache/doris/common/jni/vec/ScanPredicate.java | 5 +++++
.../org/apache/doris/common/jni/vec/VectorColumn.java | 6 +++++-
.../apache/doris/maxcompute/MaxComputeColumnValue.java | 5 +++++
.../org/apache/doris/paimon/PaimonColumnValue.java | 5 +++++
.../doris/planner/external/FileQueryScanNode.java | 7 +++----
.../doris/planner/external/hudi/HudiScanNode.java | 17 ++++++++++++++++-
11 files changed, 88 insertions(+), 12 deletions(-)
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index b0a2d2e327..a5ebf34645 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -585,10 +585,18 @@ Status VFileScanner::_get_next_reader() {
_current_range_path = range.path;
// create reader for specific format
- // TODO: add json, avro
Status init_status;
- // TODO: use data lake type
- switch (_params.format_type) {
+ TFileFormatType::type format_type = _params.format_type;
+ // JNI reader can only push down column value range
+ bool push_down_predicates = !_is_load && _params.format_type != TFileFormatType::FORMAT_JNI;
+ if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "hudi") {
+ if (range.table_format_params.hudi_params.delta_logs.empty()) {
+ // fall back to native reader if there is no log file
+ format_type = TFileFormatType::FORMAT_PARQUET;
+ }
+ }
+ switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
if (_real_tuple_desc->table_desc()->table_type() ==
::doris::TTableType::type::MAX_COMPUTE_TABLE) {
@@ -625,7 +633,7 @@ Status VFileScanner::_get_next_reader() {
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
}
- if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
+ if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
@@ -660,7 +668,7 @@ Status VFileScanner::_get_next_reader() {
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, _params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
- if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
+ if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml b/fe/be-java-extensions/hudi-scanner/pom.xml
index e07e5aedb5..71564e4be3 100644
--- a/fe/be-java-extensions/hudi-scanner/pom.xml
+++ b/fe/be-java-extensions/hudi-scanner/pom.xml
@@ -37,6 +37,12 @@ under the License.
<groupId>org.apache.doris</groupId>
<artifactId>java-common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>fe-common</artifactId>
+ <groupId>org.apache.doris</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -64,6 +70,10 @@ under the License.
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>hudi-hadoop-mr</artifactId>
+ <groupId>org.apache.hudi</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -92,7 +102,6 @@ under the License.
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
- <!-- conflict with hudi-*-bundle -->
</dependencies>
<build>
<finalName>hudi-scanner</finalName>
diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
index 4b4a7bbcf7..4a7ea36e44 100644
--- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
+++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
@@ -25,10 +25,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -62,6 +65,11 @@ public class HudiColumnValue implements ColumnValue {
return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData);
}
+ @Override
+ public boolean canGetStringAsBytes() {
+ return true;
+ }
+
@Override
public boolean isNull() {
return false;
@@ -143,6 +151,16 @@ public class HudiColumnValue implements ColumnValue {
@Override
public byte[] getBytes() {
+ // Get bytes directly if fieldData is BytesWritable or Text to avoid decoding&encoding
+ if (fieldData instanceof BytesWritable) {
+ return ((BytesWritable) fieldData).getBytes();
+ }
+ if (fieldData instanceof Text) {
+ return ((Text) fieldData).getBytes();
+ }
+ if (fieldData instanceof String) {
+ return ((String) fieldData).getBytes(StandardCharsets.UTF_8);
+ }
return (byte[]) inspectObject();
}
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
index b559f2a0af..14a412dccb 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
@@ -50,6 +50,11 @@ public class MockJniScanner extends JniScanner {
this.j = j;
}
+ @Override
+ public boolean canGetStringAsBytes() {
+ return false;
+ }
+
@Override
public boolean isNull() {
return false;
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
index 0495179260..fa2e268366 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
@@ -27,6 +27,9 @@ import java.util.List;
* Column value in vector column
*/
public interface ColumnValue {
+ // Get bytes directly when reading string value to avoid decoding&encoding
+ boolean canGetStringAsBytes();
+
boolean isNull();
boolean getBoolean();
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
index f9b35b2352..4553d29e18 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
@@ -118,6 +118,11 @@ public class ScanPredicate {
}
}
+ @Override
+ public boolean canGetStringAsBytes() {
+ return false;
+ }
+
@Override
public String toString() {
return inspectObject().toString();
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
index 70b358a87f..f65ea8fae7 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
@@ -597,7 +597,11 @@ public class VectorColumn {
case CHAR:
case VARCHAR:
case STRING:
- appendStringAndOffset(o.getString());
+ if (o.canGetStringAsBytes()) {
+ appendBytesAndOffset(o.getBytes());
+ } else {
+ appendStringAndOffset(o.getString());
+ }
break;
case BINARY:
appendBytesAndOffset(o.getBytes());
diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
index 4c3aff97b4..a2237b4cc4 100644
--- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
+++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
@@ -57,6 +57,11 @@ public class MaxComputeColumnValue implements ColumnValue {
this.idx = 0;
}
+ @Override
+ public boolean canGetStringAsBytes() {
+ return false;
+ }
+
@Override
public boolean isNull() {
return column.isNull(idx);
diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
index e0a89c9db2..d10c876de0 100644
--- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
+++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
@@ -44,6 +44,11 @@ public class PaimonColumnValue implements ColumnValue {
this.record = record;
}
+ @Override
+ public boolean canGetStringAsBytes() {
+ return false;
+ }
+
@Override
public boolean getBoolean() {
return record.getBoolean(idx);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 498401b05b..ed004ff170 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -244,9 +244,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
- if (fileFormatType == TFileFormatType.FORMAT_JNI) {
+ if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) {
params.setProperties(locationProperties);
- } else if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
+ }
+ if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
String fsName = getFsName(inputSplit);
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
@@ -259,8 +260,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
}
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
- } else if (locationType == TFileType.FILE_S3) {
- params.setProperties(locationProperties);
}
List<String> pathPartitionKeys = getPathPartitionKeys();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index 41b9ac80b4..fdb50e78a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -30,6 +30,7 @@ import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.planner.external.TableFormatType;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.THudiFileDesc;
@@ -68,6 +69,8 @@ public class HudiScanNode extends HiveScanNode {
private final boolean isCowTable;
+ private long noLogsSplitNum = 0;
+
/**
* External file scan node for Query Hudi table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
@@ -140,8 +143,11 @@ public class HudiScanNode extends HiveScanNode {
public List<Split> getSplits() throws UserException {
if (isCowTable) {
// skip hidden files start with "."
- return super.getSplits().stream().filter(split -> !((FileSplit) split).getPath().getName().startsWith("."))
+ List<Split> cowSplits = super.getSplits().stream()
+ .filter(split -> !((FileSplit) split).getPath().getName().startsWith("."))
.collect(Collectors.toList());
+ noLogsSplitNum = cowSplits.size();
+ return cowSplits;
}
HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
@@ -211,6 +217,9 @@ public class HudiScanNode extends HiveScanNode {
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath).map(Path::toString)
.collect(Collectors.toList());
+ if (logs.isEmpty()) {
+ noLogsSplitNum++;
+ }
HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, new String[0],
partition.getPartitionValues());
@@ -233,4 +242,10 @@ public class HudiScanNode extends HiveScanNode {
}
return splits;
}
+
+ @Override
+ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
+ return super.getNodeExplainString(prefix, detailLevel)
+ + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum, inputSplitsNum);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org