You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/21 09:38:58 UTC

[doris] branch master updated: [feature-wip](multi-catalog) Support s3 storage for file scan node (#10977)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7147a7c290 [feature-wip](multi-catalog) Support s3 storage for file scan node (#10977)
7147a7c290 is described below

commit 7147a7c2906bdc488732a8c742db59ff0adcfa25
Author: huangzhaowei <hu...@bytedance.com>
AuthorDate: Thu Jul 21 17:38:53 2022 +0800

    [feature-wip](multi-catalog) Support s3 storage for file scan node (#10977)
    
    This is an example of s3 hms_catalog:
    ```sql
    CREATE CATALOG hms_catalog properties(
    "type" = "hms",
    "hive.metastore.uris"="thrift://localhost:9083",
    "AWS_ACCESS_KEY" = "your access key",
    "AWS_SECRET_KEY"="your secret key",
    "AWS_ENDPOINT"="s3 endpoint",
    "AWS_REGION"="s3-region",
    "fs.s3a.paging.maximum"="1000");
    ```
    All these params are necessary;
---
 .gitignore                                         |  1 +
 be/src/io/file_factory.cpp                         | 52 ++++++++++++++++++++++
 be/src/io/file_factory.h                           | 14 ++++++
 be/src/vec/exec/file_arrow_scanner.cpp             | 10 +++--
 be/src/vec/exec/file_text_scanner.cpp              |  8 ++--
 bin/start_be.sh                                    | 10 +++++
 .../java/org/apache/doris/catalog/HiveTable.java   |  5 +++
 .../doris/datasource/DataSourceProperty.java       | 24 ++++++++++
 .../planner/external/ExternalFileScanNode.java     | 33 +++++++++-----
 .../planner/external/ExternalFileScanProvider.java |  4 +-
 .../planner/external/ExternalHiveScanProvider.java | 46 ++++++++++++++-----
 .../external/ExternalIcebergScanProvider.java      | 14 ++----
 gensrc/thrift/PlanNodes.thrift                     |  2 +
 13 files changed, 181 insertions(+), 42 deletions(-)

diff --git a/.gitignore b/.gitignore
index c8cf1a765b..e743211fac 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,6 +33,7 @@ package-lock.json
 # ignore project file
 .cproject
 .project
+.cache
 .settings/
 **/.idea/
 **/.vscode/
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 9937eaa6d5..46d19669b9 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -130,3 +130,55 @@ doris::Status doris::FileFactory::create_file_reader(
     }
     return Status::OK();
 }
+
+doris::Status doris::FileFactory::_new_file_reader(doris::ExecEnv* env, RuntimeProfile* profile,
+                                                   const TFileScanRangeParams& params,
+                                                   const doris::TFileRangeDesc& range,
+                                                   FileReader*& file_reader_ptr) {
+    doris::TFileType::type type = params.file_type;
+
+    if (type == TFileType::FILE_STREAM) {
+        return Status::InternalError("UnSupport UniquePtr For FileStream type");
+    }
+
+    switch (type) {
+    case TFileType::FILE_S3: {
+        file_reader_ptr = new BufferedReader(
+                profile, new S3Reader(params.properties, range.path, range.start_offset));
+        break;
+    }
+    case TFileType::FILE_HDFS: {
+        FileReader* hdfs_reader = nullptr;
+        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(params.hdfs_params, range.path,
+                                                        range.start_offset, &hdfs_reader));
+        file_reader_ptr = new BufferedReader(profile, hdfs_reader);
+        break;
+    }
+    default:
+        return Status::InternalError("UnSupport File Reader Type: " + std::to_string(type));
+    }
+
+    return Status::OK();
+}
+
+doris::Status doris::FileFactory::create_file_reader(doris::ExecEnv* env, RuntimeProfile* profile,
+                                                     const TFileScanRangeParams& params,
+                                                     const doris::TFileRangeDesc& range,
+                                                     std::shared_ptr<FileReader>& file_reader) {
+    FileReader* file_reader_ptr;
+    RETURN_IF_ERROR(_new_file_reader(env, profile, params, range, file_reader_ptr));
+    file_reader.reset(file_reader_ptr);
+
+    return Status::OK();
+}
+
+doris::Status doris::FileFactory::create_file_reader(doris::ExecEnv* env, RuntimeProfile* profile,
+                                                     const TFileScanRangeParams& params,
+                                                     const doris::TFileRangeDesc& range,
+                                                     std::unique_ptr<FileReader>& file_reader) {
+    FileReader* file_reader_ptr;
+    RETURN_IF_ERROR(_new_file_reader(env, profile, params, range, file_reader_ptr));
+    file_reader.reset(file_reader_ptr);
+
+    return Status::OK();
+}
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index c1350d6682..c36b611f9e 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -48,6 +48,16 @@ public:
                                      const TBrokerRangeDesc& range, int64_t start_offset,
                                      std::shared_ptr<FileReader>& file_reader);
 
+    static Status create_file_reader(ExecEnv* env, RuntimeProfile* profile,
+                                     const TFileScanRangeParams& params,
+                                     const TFileRangeDesc& range,
+                                     std::unique_ptr<FileReader>& file_reader);
+
+    static Status create_file_reader(ExecEnv* env, RuntimeProfile* profile,
+                                     const TFileScanRangeParams& params,
+                                     const TFileRangeDesc& range,
+                                     std::shared_ptr<FileReader>& file_reader);
+
     static TFileType::type convert_storage_type(TStorageBackendType::type type) {
         switch (type) {
         case TStorageBackendType::LOCAL:
@@ -72,6 +82,10 @@ private:
                                    const std::map<std::string, std::string>& properties,
                                    const TBrokerRangeDesc& range, int64_t start_offset,
                                    FileReader*& file_reader);
+
+    static Status _new_file_reader(ExecEnv* env, RuntimeProfile* profile,
+                                   const TFileScanRangeParams& params, const TFileRangeDesc& range,
+                                   FileReader*& file_reader);
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 65b4a032ad..bd1428e820 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -17,8 +17,11 @@
 
 #include "vec/exec/file_arrow_scanner.h"
 
+#include <memory>
+
 #include "exec/arrow/parquet_reader.h"
 #include "io/buffered_reader.h"
+#include "io/file_factory.h"
 #include "io/hdfs_reader_writer.h"
 #include "runtime/descriptors.h"
 #include "vec/utils/arrow_column_to_doris_column.h"
@@ -54,10 +57,9 @@ Status FileArrowScanner::_open_next_reader() {
         }
         const TFileRangeDesc& range = _ranges[_next_range++];
         std::unique_ptr<FileReader> file_reader;
-        FileReader* hdfs_reader = nullptr;
-        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path,
-                                                        range.start_offset, &hdfs_reader));
-        file_reader.reset(new BufferedReader(_profile, hdfs_reader));
+
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params,
+                                                        range, file_reader));
         RETURN_IF_ERROR(file_reader->open());
         if (file_reader->size() == 0) {
             file_reader->close();
diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp
index c7d95e45ab..593b78867f 100644
--- a/be/src/vec/exec/file_text_scanner.cpp
+++ b/be/src/vec/exec/file_text_scanner.cpp
@@ -28,6 +28,7 @@
 #include "exec/text_converter.hpp"
 #include "exprs/expr_context.h"
 #include "io/buffered_reader.h"
+#include "io/file_factory.h"
 #include "io/hdfs_reader_writer.h"
 #include "util/types.h"
 #include "util/utf8_check.h"
@@ -152,11 +153,8 @@ Status FileTextScanner::_open_next_reader() {
 
 Status FileTextScanner::_open_file_reader() {
     const TFileRangeDesc& range = _ranges[_next_range];
-
-    FileReader* hdfs_reader = nullptr;
-    RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path,
-                                                    range.start_offset, &hdfs_reader));
-    _cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader));
+    RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range,
+                                                    _cur_file_reader));
     return _cur_file_reader->open();
 }
 
diff --git a/bin/start_be.sh b/bin/start_be.sh
index 74e1b49c14..551a12e5c0 100755
--- a/bin/start_be.sh
+++ b/bin/start_be.sh
@@ -31,12 +31,17 @@ OPTS=$(getopt \
 eval set -- "$OPTS"
 
 RUN_DAEMON=0
+RUN_IN_AWS=0
 while true; do
     case "$1" in
     --daemon)
         RUN_DAEMON=1
         shift
         ;;
+    --aws)
+        RUN_IN_AWS=1
+        shift
+        ;;
     --)
         shift
         break
@@ -170,6 +175,11 @@ else
     LIMIT="/bin/limit3 -c 0 -n 65536"
 fi
 
+## If you are not running in aws cloud, disable this env since https://github.com/aws/aws-sdk-cpp/issues/1410.
+if [ ${RUN_IN_AWS} -eq 0 ]; then
+    export AWS_EC2_METADATA_DISABLED=true
+fi
+
 if [ ${RUN_DAEMON} -eq 1 ]; then
     nohup $LIMIT ${DORIS_HOME}/lib/doris_be "$@" >> $LOG_DIR/be.out 2>&1 < /dev/null &
 else
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index c04017ef7b..e512f33c64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -53,10 +53,15 @@ public class HiveTable extends Table {
     public static final String HIVE_TABLE = "table";
     public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
     public static final String HIVE_HDFS_PREFIX = "dfs.";
+    public static final String S3_FS_PREFIX = "fs.s3";
     public static final String S3_PROPERTIES_PREFIX = "AWS";
     public static final String S3_AK = "AWS_ACCESS_KEY";
     public static final String S3_SK = "AWS_SECRET_KEY";
     public static final String S3_ENDPOINT = "AWS_ENDPOINT";
+    public static final String AWS_REGION = "AWS_REGION";
+    public static final String AWS_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
+    public static final String AWS_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
+    public static final String AWS_CONN_TIMEOUT_MS = "AWS_CONN_TIMEOUT_MS";
 
     public HiveTable() {
         super(TableType.HIVE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
index 1351e29a11..0d3e67c5fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
@@ -57,16 +57,40 @@ public class DataSourceProperty implements Writable {
         Map<String, String> s3Properties = Maps.newHashMap();
         if (properties.containsKey(HiveTable.S3_AK)) {
             s3Properties.put("fs.s3a.access.key", properties.get(HiveTable.S3_AK));
+            s3Properties.put(HiveTable.S3_AK, properties.get(HiveTable.S3_AK));
         }
         if (properties.containsKey(HiveTable.S3_SK)) {
             s3Properties.put("fs.s3a.secret.key", properties.get(HiveTable.S3_SK));
+            s3Properties.put(HiveTable.S3_SK, properties.get(HiveTable.S3_SK));
         }
         if (properties.containsKey(HiveTable.S3_ENDPOINT)) {
             s3Properties.put("fs.s3a.endpoint", properties.get(HiveTable.S3_ENDPOINT));
+            s3Properties.put(HiveTable.S3_ENDPOINT, properties.get(HiveTable.S3_ENDPOINT));
+        }
+        if (properties.containsKey(HiveTable.AWS_REGION)) {
+            s3Properties.put("fs.s3a.endpoint.region", properties.get(HiveTable.AWS_REGION));
+            s3Properties.put(HiveTable.AWS_REGION, properties.get(HiveTable.AWS_REGION));
+        }
+        if (properties.containsKey(HiveTable.AWS_MAX_CONN_SIZE)) {
+            s3Properties.put("fs.s3a.connection.maximum", properties.get(HiveTable.AWS_MAX_CONN_SIZE));
+            s3Properties.put(HiveTable.AWS_MAX_CONN_SIZE, properties.get(HiveTable.AWS_MAX_CONN_SIZE));
+        }
+        if (properties.containsKey(HiveTable.AWS_REQUEST_TIMEOUT_MS)) {
+            s3Properties.put("fs.s3a.connection.request.timeout", properties.get(HiveTable.AWS_REQUEST_TIMEOUT_MS));
+            s3Properties.put(HiveTable.AWS_REQUEST_TIMEOUT_MS, properties.get(HiveTable.AWS_REQUEST_TIMEOUT_MS));
+        }
+        if (properties.containsKey(HiveTable.AWS_CONN_TIMEOUT_MS)) {
+            s3Properties.put("fs.s3a.connection.timeout", properties.get(HiveTable.AWS_CONN_TIMEOUT_MS));
+            s3Properties.put(HiveTable.AWS_CONN_TIMEOUT_MS, properties.get(HiveTable.AWS_CONN_TIMEOUT_MS));
         }
         s3Properties.put("fs.s3.impl.disable.cache", "true");
         s3Properties.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
         s3Properties.put("fs.s3a.attempts.maximum", "2");
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (entry.getKey().startsWith(HiveTable.S3_FS_PREFIX)) {
+                s3Properties.put(entry.getKey(), entry.getValue());
+            }
+        }
         return s3Properties;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 78ef63f257..9fbb8fa3fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -74,7 +74,8 @@ import java.util.Random;
 import java.util.Set;
 
 /**
- * ExternalFileScanNode for the file access type of datasource, now only support hive,hudi and iceberg.
+ * ExternalFileScanNode for the file access type of datasource, now only support
+ * hive,hudi and iceberg.
  */
 public class ExternalFileScanNode extends ExternalScanNode {
     private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class);
@@ -219,9 +220,11 @@ public class ExternalFileScanNode extends ExternalScanNode {
         if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) {
             Map<String, String> serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
             String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim"))
-                    ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim");
+                    ? HIVE_DEFAULT_COLUMN_SEPARATOR
+                    : serDeInfoParams.get("field.delim");
             String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim"))
-                    ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim");
+                    ? HIVE_DEFAULT_LINE_DELIMITER
+                    : serDeInfoParams.get("line.delim");
 
             TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
             textParams.setLineDelimiterStr(lineDelimiter);
@@ -246,7 +249,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
             slotDescByName.put(column.getName(), slotDesc);
         }
 
-        // Hive table must extract partition value from path and hudi/iceberg table keep partition field in file.
+        // Hive table must extract partition value from path and hudi/iceberg table keep
+        // partition field in file.
         partitionKeys.addAll(scanProvider.getPathPartitionKeys());
         context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
         for (SlotDescriptor slot : desc.getSlots()) {
@@ -276,14 +280,14 @@ public class ExternalFileScanNode extends ExternalScanNode {
     // If fileFormat is not null, we use fileFormat instead of check file's suffix
     private void buildScanRange() throws UserException, IOException {
         scanRangeLocations = Lists.newArrayList();
-        InputSplit[] inputSplits = scanProvider.getSplits(conjuncts);
-        if (0 == inputSplits.length) {
+        List<InputSplit> inputSplits = scanProvider.getSplits(conjuncts);
+        if (inputSplits.isEmpty()) {
             return;
         }
-        inputSplitsNum = inputSplits.length;
+        inputSplitsNum = inputSplits.size();
 
-        String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
-        String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
+        String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString();
+        String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
         context.params.setFileType(scanProvider.getTableFileType());
         context.params.setFormatType(scanProvider.getTableFormatType());
@@ -292,6 +296,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
             THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties());
             tHdfsParams.setFsName(fsName);
             context.params.setHdfsParams(tHdfsParams);
+        } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) {
+            context.params.setProperties(hmsTable.getS3Properties());
         }
 
         TScanRangeLocations curLocations = newLocations(context.params);
@@ -312,6 +318,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                     + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"
                     + " loaction: " + Joiner.on("|").join(split.getLocations()));
 
+
             fileSplitStrategy.update(fileSplit);
             // Add a new location when it's can be split
             if (fileSplitStrategy.hasNext()) {
@@ -353,10 +360,15 @@ public class ExternalFileScanNode extends ExternalScanNode {
             FileSplit fileSplit,
             List<String> columnsFromPath) throws DdlException, MetaNotFoundException {
         TFileRangeDesc rangeDesc = new TFileRangeDesc();
-        rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
         rangeDesc.setStartOffset(fileSplit.getStart());
         rangeDesc.setSize(fileSplit.getLength());
         rangeDesc.setColumnsFromPath(columnsFromPath);
+
+        if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
+            rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
+        } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) {
+            rangeDesc.setPath(fileSplit.getPath().toString());
+        }
         return rangeDesc;
     }
 
@@ -417,4 +429,3 @@ public class ExternalFileScanNode extends ExternalScanNode {
         return output.toString();
     }
 }
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
index ebdd7a768a..36e11d8845 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
@@ -37,11 +37,11 @@ import java.util.Map;
 public interface ExternalFileScanProvider {
     TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException;
 
-    TFileType getTableFileType();
+    TFileType getTableFileType() throws DdlException, MetaNotFoundException;
 
     String getMetaStoreUrl();
 
-    InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException;
+    List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException;
 
     Table getRemoteHiveTable() throws DdlException, MetaNotFoundException;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
index c60ee3c211..849511b0bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
@@ -27,6 +27,7 @@ import org.apache.doris.external.hive.util.HiveUtil;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -39,6 +40,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -69,8 +71,16 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider {
     }
 
     @Override
-    public TFileType getTableFileType() {
-        return TFileType.FILE_HDFS;
+    public TFileType getTableFileType() throws DdlException, MetaNotFoundException {
+        String location = hmsTable.getRemoteTable().getSd().getLocation();
+        if (location != null && !location.isEmpty()) {
+            if (location.startsWith("s3a") || location.startsWith("s3n")) {
+                return TFileType.FILE_S3;
+            } else if (location.startsWith("hdfs:")) {
+                return TFileType.FILE_HDFS;
+            }
+        }
+        throw new DdlException("Unknown file type for hms table.");
     }
 
     @Override
@@ -79,34 +89,50 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider {
     }
 
     @Override
-    public InputSplit[] getSplits(List<Expr> exprs)
+    public List<InputSplit> getSplits(List<Expr> exprs)
             throws IOException, UserException {
         String splitsPath = getRemoteHiveTable().getSd().getLocation();
         List<String> partitionKeys = getRemoteHiveTable().getPartitionKeys()
                 .stream().map(FieldSchema::getName).collect(Collectors.toList());
+        List<Partition> hivePartitions = new ArrayList<>();
 
         if (partitionKeys.size() > 0) {
             ExprNodeGenericFuncDesc hivePartitionPredicate = HiveMetaStoreClientHelper.convertToHivePartitionExpr(
                     exprs, partitionKeys, hmsTable.getName());
 
             String metaStoreUris = getMetaStoreUrl();
-            List<Partition> hivePartitions = HiveMetaStoreClientHelper.getHivePartitions(
-                    metaStoreUris,  getRemoteHiveTable(), hivePartitionPredicate);
-            if (!hivePartitions.isEmpty()) {
-                splitsPath = hivePartitions.stream().map(x -> x.getSd().getLocation())
-                        .collect(Collectors.joining(","));
-            }
+            hivePartitions.addAll(HiveMetaStoreClientHelper.getHivePartitions(
+                    metaStoreUris,  getRemoteHiveTable(), hivePartitionPredicate));
         }
 
         String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
 
         Configuration configuration = setConfiguration();
         InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false);
+        if (!hivePartitions.isEmpty()) {
+            return hivePartitions.parallelStream()
+                    .flatMap(x -> getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream())
+                    .collect(Collectors.toList());
+        } else {
+            return getSplitsByPath(inputFormat, configuration, splitsPath);
+        }
+    }
+
+    private List<InputSplit> getSplitsByPath(
+            InputFormat<?, ?> inputFormat,
+            Configuration configuration,
+            String splitsPath) {
         JobConf jobConf = new JobConf(configuration);
         FileInputFormat.setInputPaths(jobConf, splitsPath);
-        return inputFormat.getSplits(jobConf, 0);
+        try {
+            InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
+            return Lists.newArrayList(splits);
+        } catch (IOException e) {
+            return new ArrayList<InputSplit>();
+        }
     }
 
+
     protected Configuration setConfiguration() {
         Configuration conf = new Configuration();
         Map<String, String> dfsProperties = hmsTable.getDfsProperties();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
index 83ddc86c4d..10fd85b242 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
@@ -24,7 +24,6 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileType;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -56,7 +55,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
     public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException {
         TFileFormatType type;
 
-        String icebergFormat  = getRemoteHiveTable().getParameters()
+        String icebergFormat = getRemoteHiveTable().getParameters()
                 .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
         if (icebergFormat.equals("parquet")) {
             type = TFileFormatType.FORMAT_PARQUET;
@@ -69,12 +68,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
     }
 
     @Override
-    public TFileType getTableFileType() {
-        return TFileType.FILE_HDFS;
-    }
-
-    @Override
-    public InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException {
+    public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException {
         List<Expression> expressions = new ArrayList<>();
         for (Expr conjunct : exprs) {
             Expression expression = IcebergUtils.convertToIcebergExpr(conjunct);
@@ -88,7 +82,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
         for (Expression predicate : expressions) {
             scan = scan.filter(predicate);
         }
-        List<FileSplit> splits = new ArrayList<>();
+        List<InputSplit> splits = new ArrayList<>();
 
         for (FileScanTask task : scan.planFiles()) {
             for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
@@ -96,7 +90,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
                         spitTask.start(), spitTask.length(), new String[0]));
             }
         }
-        return splits.toArray(new InputSplit[0]);
+        return splits;
     }
 
     private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index b6d0a3b19a..54f50085a9 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -237,6 +237,8 @@ struct TFileScanRangeParams {
 
   6: optional THdfsParams hdfs_params;
   7: optional TFileTextScanRangeParams text_params;
+  // properties for file such as s3 information
+  8: optional map<string, string> properties;
 }
 
 struct TFileRangeDesc {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org