You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/20 03:20:28 UTC

[doris] branch master updated: [opt](hudi) using native reader to read the base file with no log file (#20988)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 923f7edad0 [opt](hudi) using native reader to read the base file with no log file (#20988)
923f7edad0 is described below

commit 923f7edad081d77c17279e7aa327ca2c7984fdb2
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Tue Jun 20 11:20:21 2023 +0800

    [opt](hudi) using native reader to read the base file with no log file (#20988)
    
    Two optimizations:
    1. Insert string bytes directly to remove decoding&encoding process.
    2. Use native reader to read the hudi base file if it has no log file. Use `explain` to show how many splits are read natively.
---
 be/src/vec/exec/scan/vfile_scanner.cpp                 | 18 +++++++++++++-----
 fe/be-java-extensions/hudi-scanner/pom.xml             | 11 ++++++++++-
 .../java/org/apache/doris/hudi/HudiColumnValue.java    | 18 ++++++++++++++++++
 .../org/apache/doris/common/jni/MockJniScanner.java    |  5 +++++
 .../org/apache/doris/common/jni/vec/ColumnValue.java   |  3 +++
 .../org/apache/doris/common/jni/vec/ScanPredicate.java |  5 +++++
 .../org/apache/doris/common/jni/vec/VectorColumn.java  |  6 +++++-
 .../apache/doris/maxcompute/MaxComputeColumnValue.java |  5 +++++
 .../org/apache/doris/paimon/PaimonColumnValue.java     |  5 +++++
 .../doris/planner/external/FileQueryScanNode.java      |  7 +++----
 .../doris/planner/external/hudi/HudiScanNode.java      | 17 ++++++++++++++++-
 11 files changed, 88 insertions(+), 12 deletions(-)

diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index b0a2d2e327..a5ebf34645 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -585,10 +585,18 @@ Status VFileScanner::_get_next_reader() {
         _current_range_path = range.path;
 
         // create reader for specific format
-        // TODO: add json, avro
         Status init_status;
-        // TODO: use data lake type
-        switch (_params.format_type) {
+        TFileFormatType::type format_type = _params.format_type;
+        // JNI reader can only push down column value range
+        bool push_down_predicates = !_is_load && _params.format_type != TFileFormatType::FORMAT_JNI;
+        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params &&
+            range.table_format_params.table_format_type == "hudi") {
+            if (range.table_format_params.hudi_params.delta_logs.empty()) {
+                // fall back to native reader if there is no log file
+                format_type = TFileFormatType::FORMAT_PARQUET;
+            }
+        }
+        switch (format_type) {
         case TFileFormatType::FORMAT_JNI: {
             if (_real_tuple_desc->table_desc()->table_type() ==
                 ::doris::TTableType::type::MAX_COMPUTE_TABLE) {
@@ -625,7 +633,7 @@ Status VFileScanner::_get_next_reader() {
                 SCOPED_TIMER(_open_reader_timer);
                 RETURN_IF_ERROR(parquet_reader->open());
             }
-            if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
+            if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
                 _push_down_conjuncts.resize(_conjuncts.size());
                 for (size_t i = 0; i != _conjuncts.size(); ++i) {
                     RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
@@ -660,7 +668,7 @@ Status VFileScanner::_get_next_reader() {
             std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
                     _profile, _state, _params, range, _state->query_options().batch_size,
                     _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
-            if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
+            if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
                 _push_down_conjuncts.resize(_conjuncts.size());
                 for (size_t i = 0; i != _conjuncts.size(); ++i) {
                     RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml b/fe/be-java-extensions/hudi-scanner/pom.xml
index e07e5aedb5..71564e4be3 100644
--- a/fe/be-java-extensions/hudi-scanner/pom.xml
+++ b/fe/be-java-extensions/hudi-scanner/pom.xml
@@ -37,6 +37,12 @@ under the License.
             <groupId>org.apache.doris</groupId>
             <artifactId>java-common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>fe-common</artifactId>
+                    <groupId>org.apache.doris</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -64,6 +70,10 @@ under the License.
                     <groupId>org.apache.avro</groupId>
                     <artifactId>avro</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>hudi-hadoop-mr</artifactId>
+                    <groupId>org.apache.hudi</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -92,7 +102,6 @@ under the License.
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
-        <!-- conflict with hudi-*-bundle -->
     </dependencies>
     <build>
         <finalName>hudi-scanner</finalName>
diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
index 4b4a7bbcf7..4a7ea36e44 100644
--- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
+++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
@@ -25,10 +25,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -62,6 +65,11 @@ public class HudiColumnValue implements ColumnValue {
         return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData);
     }
 
+    @Override
+    public boolean canGetStringAsBytes() {
+        return true;
+    }
+
     @Override
     public boolean isNull() {
         return false;
@@ -143,6 +151,16 @@ public class HudiColumnValue implements ColumnValue {
 
     @Override
     public byte[] getBytes() {
+        // Get bytes directly if fieldData is BytesWritable or Text to avoid decoding&encoding
+        if (fieldData instanceof BytesWritable) {
+            return ((BytesWritable) fieldData).getBytes();
+        }
+        if (fieldData instanceof Text) {
+            return ((Text) fieldData).getBytes();
+        }
+        if (fieldData instanceof String) {
+            return ((String) fieldData).getBytes(StandardCharsets.UTF_8);
+        }
         return (byte[]) inspectObject();
     }
 
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
index b559f2a0af..14a412dccb 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
@@ -50,6 +50,11 @@ public class MockJniScanner extends JniScanner {
             this.j = j;
         }
 
+        @Override
+        public boolean canGetStringAsBytes() {
+            return false;
+        }
+
         @Override
         public boolean isNull() {
             return false;
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
index 0495179260..fa2e268366 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
@@ -27,6 +27,9 @@ import java.util.List;
  * Column value in vector column
  */
 public interface ColumnValue {
+    // Get bytes directly when reading string value to avoid decoding&encoding
+    boolean canGetStringAsBytes();
+
     boolean isNull();
 
     boolean getBoolean();
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
index f9b35b2352..4553d29e18 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
@@ -118,6 +118,11 @@ public class ScanPredicate {
             }
         }
 
+        @Override
+        public boolean canGetStringAsBytes() {
+            return false;
+        }
+
         @Override
         public String toString() {
             return inspectObject().toString();
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
index 70b358a87f..f65ea8fae7 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
@@ -597,7 +597,11 @@ public class VectorColumn {
             case CHAR:
             case VARCHAR:
             case STRING:
-                appendStringAndOffset(o.getString());
+                if (o.canGetStringAsBytes()) {
+                    appendBytesAndOffset(o.getBytes());
+                } else {
+                    appendStringAndOffset(o.getString());
+                }
                 break;
             case BINARY:
                 appendBytesAndOffset(o.getBytes());
diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
index 4c3aff97b4..a2237b4cc4 100644
--- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
+++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
@@ -57,6 +57,11 @@ public class MaxComputeColumnValue implements ColumnValue {
         this.idx = 0;
     }
 
+    @Override
+    public boolean canGetStringAsBytes() {
+        return false;
+    }
+
     @Override
     public boolean isNull() {
         return column.isNull(idx);
diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
index e0a89c9db2..d10c876de0 100644
--- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
+++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
@@ -44,6 +44,11 @@ public class PaimonColumnValue implements ColumnValue {
         this.record = record;
     }
 
+    @Override
+    public boolean canGetStringAsBytes() {
+        return false;
+    }
+
     @Override
     public boolean getBoolean() {
         return record.getBoolean(idx);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 498401b05b..ed004ff170 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -244,9 +244,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
 
         // set hdfs params for hdfs file type.
         Map<String, String> locationProperties = getLocationProperties();
-        if (fileFormatType == TFileFormatType.FORMAT_JNI) {
+        if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) {
             params.setProperties(locationProperties);
-        } else if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
+        }
+        if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
             String fsName = getFsName(inputSplit);
             THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
             tHdfsParams.setFsName(fsName);
@@ -259,8 +260,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
                 }
                 params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
             }
-        } else if (locationType == TFileType.FILE_S3) {
-            params.setProperties(locationProperties);
         }
 
         List<String> pathPartitionKeys = getPathPartitionKeys();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index 41b9ac80b4..fdb50e78a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -30,6 +30,7 @@ import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.planner.external.TableFormatType;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.THudiFileDesc;
@@ -68,6 +69,8 @@ public class HudiScanNode extends HiveScanNode {
 
     private final boolean isCowTable;
 
+    private long noLogsSplitNum = 0;
+
     /**
      * External file scan node for Query Hudi table
      * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
@@ -140,8 +143,11 @@ public class HudiScanNode extends HiveScanNode {
     public List<Split> getSplits() throws UserException {
         if (isCowTable) {
             // skip hidden files start with "."
-            return super.getSplits().stream().filter(split -> !((FileSplit) split).getPath().getName().startsWith("."))
+            List<Split> cowSplits = super.getSplits().stream()
+                    .filter(split -> !((FileSplit) split).getPath().getName().startsWith("."))
                     .collect(Collectors.toList());
+            noLogsSplitNum = cowSplits.size();
+            return cowSplits;
         }
 
         HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
@@ -211,6 +217,9 @@ public class HudiScanNode extends HiveScanNode {
 
                     List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath).map(Path::toString)
                             .collect(Collectors.toList());
+                    if (logs.isEmpty()) {
+                        noLogsSplitNum++;
+                    }
 
                     HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, new String[0],
                             partition.getPartitionValues());
@@ -233,4 +242,10 @@ public class HudiScanNode extends HiveScanNode {
         }
         return splits;
     }
+
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
+        return super.getNodeExplainString(prefix, detailLevel)
+                + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum, inputSplitsNum);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org