You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/21 09:38:58 UTC
[doris] branch master updated: [feature-wip](multi-catalog) Support s3 storage for file scan node (#10977)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7147a7c290 [feature-wip](multi-catalog) Support s3 storage for file scan node (#10977)
7147a7c290 is described below
commit 7147a7c2906bdc488732a8c742db59ff0adcfa25
Author: huangzhaowei <hu...@bytedance.com>
AuthorDate: Thu Jul 21 17:38:53 2022 +0800
[feature-wip](multi-catalog) Support s3 storage for file scan node (#10977)
This is an example of s3 hms_catalog:
```sql
CREATE CATALOG hms_catalog properties(
"type" = "hms",
"hive.metastore.uris"="thrift://localhost:9083",
"AWS_ACCESS_KEY" = "your access key",
"AWS_SECRET_KEY"="your secret key",
"AWS_ENDPOINT"="s3 endpoint",
"AWS_REGION"="s3-region",
"fs.s3a.paging.maximum"="1000");
```
All these params are necessary;
---
.gitignore | 1 +
be/src/io/file_factory.cpp | 52 ++++++++++++++++++++++
be/src/io/file_factory.h | 14 ++++++
be/src/vec/exec/file_arrow_scanner.cpp | 10 +++--
be/src/vec/exec/file_text_scanner.cpp | 8 ++--
bin/start_be.sh | 10 +++++
.../java/org/apache/doris/catalog/HiveTable.java | 5 +++
.../doris/datasource/DataSourceProperty.java | 24 ++++++++++
.../planner/external/ExternalFileScanNode.java | 33 +++++++++-----
.../planner/external/ExternalFileScanProvider.java | 4 +-
.../planner/external/ExternalHiveScanProvider.java | 46 ++++++++++++++-----
.../external/ExternalIcebergScanProvider.java | 14 ++----
gensrc/thrift/PlanNodes.thrift | 2 +
13 files changed, 181 insertions(+), 42 deletions(-)
diff --git a/.gitignore b/.gitignore
index c8cf1a765b..e743211fac 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,6 +33,7 @@ package-lock.json
# ignore project file
.cproject
.project
+.cache
.settings/
**/.idea/
**/.vscode/
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 9937eaa6d5..46d19669b9 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -130,3 +130,55 @@ doris::Status doris::FileFactory::create_file_reader(
}
return Status::OK();
}
+
+doris::Status doris::FileFactory::_new_file_reader(doris::ExecEnv* env, RuntimeProfile* profile,
+ const TFileScanRangeParams& params,
+ const doris::TFileRangeDesc& range,
+ FileReader*& file_reader_ptr) {
+ doris::TFileType::type type = params.file_type;
+
+ if (type == TFileType::FILE_STREAM) {
+ return Status::InternalError("UnSupport UniquePtr For FileStream type");
+ }
+
+ switch (type) {
+ case TFileType::FILE_S3: {
+ file_reader_ptr = new BufferedReader(
+ profile, new S3Reader(params.properties, range.path, range.start_offset));
+ break;
+ }
+ case TFileType::FILE_HDFS: {
+ FileReader* hdfs_reader = nullptr;
+ RETURN_IF_ERROR(HdfsReaderWriter::create_reader(params.hdfs_params, range.path,
+ range.start_offset, &hdfs_reader));
+ file_reader_ptr = new BufferedReader(profile, hdfs_reader);
+ break;
+ }
+ default:
+ return Status::InternalError("UnSupport File Reader Type: " + std::to_string(type));
+ }
+
+ return Status::OK();
+}
+
+doris::Status doris::FileFactory::create_file_reader(doris::ExecEnv* env, RuntimeProfile* profile,
+ const TFileScanRangeParams& params,
+ const doris::TFileRangeDesc& range,
+ std::shared_ptr<FileReader>& file_reader) {
+ FileReader* file_reader_ptr;
+ RETURN_IF_ERROR(_new_file_reader(env, profile, params, range, file_reader_ptr));
+ file_reader.reset(file_reader_ptr);
+
+ return Status::OK();
+}
+
+doris::Status doris::FileFactory::create_file_reader(doris::ExecEnv* env, RuntimeProfile* profile,
+ const TFileScanRangeParams& params,
+ const doris::TFileRangeDesc& range,
+ std::unique_ptr<FileReader>& file_reader) {
+ FileReader* file_reader_ptr;
+ RETURN_IF_ERROR(_new_file_reader(env, profile, params, range, file_reader_ptr));
+ file_reader.reset(file_reader_ptr);
+
+ return Status::OK();
+}
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index c1350d6682..c36b611f9e 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -48,6 +48,16 @@ public:
const TBrokerRangeDesc& range, int64_t start_offset,
std::shared_ptr<FileReader>& file_reader);
+ static Status create_file_reader(ExecEnv* env, RuntimeProfile* profile,
+ const TFileScanRangeParams& params,
+ const TFileRangeDesc& range,
+ std::unique_ptr<FileReader>& file_reader);
+
+ static Status create_file_reader(ExecEnv* env, RuntimeProfile* profile,
+ const TFileScanRangeParams& params,
+ const TFileRangeDesc& range,
+ std::shared_ptr<FileReader>& file_reader);
+
static TFileType::type convert_storage_type(TStorageBackendType::type type) {
switch (type) {
case TStorageBackendType::LOCAL:
@@ -72,6 +82,10 @@ private:
const std::map<std::string, std::string>& properties,
const TBrokerRangeDesc& range, int64_t start_offset,
FileReader*& file_reader);
+
+ static Status _new_file_reader(ExecEnv* env, RuntimeProfile* profile,
+ const TFileScanRangeParams& params, const TFileRangeDesc& range,
+ FileReader*& file_reader);
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 65b4a032ad..bd1428e820 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -17,8 +17,11 @@
#include "vec/exec/file_arrow_scanner.h"
+#include <memory>
+
#include "exec/arrow/parquet_reader.h"
#include "io/buffered_reader.h"
+#include "io/file_factory.h"
#include "io/hdfs_reader_writer.h"
#include "runtime/descriptors.h"
#include "vec/utils/arrow_column_to_doris_column.h"
@@ -54,10 +57,9 @@ Status FileArrowScanner::_open_next_reader() {
}
const TFileRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
- FileReader* hdfs_reader = nullptr;
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path,
- range.start_offset, &hdfs_reader));
- file_reader.reset(new BufferedReader(_profile, hdfs_reader));
+
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params,
+ range, file_reader));
RETURN_IF_ERROR(file_reader->open());
if (file_reader->size() == 0) {
file_reader->close();
diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp
index c7d95e45ab..593b78867f 100644
--- a/be/src/vec/exec/file_text_scanner.cpp
+++ b/be/src/vec/exec/file_text_scanner.cpp
@@ -28,6 +28,7 @@
#include "exec/text_converter.hpp"
#include "exprs/expr_context.h"
#include "io/buffered_reader.h"
+#include "io/file_factory.h"
#include "io/hdfs_reader_writer.h"
#include "util/types.h"
#include "util/utf8_check.h"
@@ -152,11 +153,8 @@ Status FileTextScanner::_open_next_reader() {
Status FileTextScanner::_open_file_reader() {
const TFileRangeDesc& range = _ranges[_next_range];
-
- FileReader* hdfs_reader = nullptr;
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path,
- range.start_offset, &hdfs_reader));
- _cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader));
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range,
+ _cur_file_reader));
return _cur_file_reader->open();
}
diff --git a/bin/start_be.sh b/bin/start_be.sh
index 74e1b49c14..551a12e5c0 100755
--- a/bin/start_be.sh
+++ b/bin/start_be.sh
@@ -31,12 +31,17 @@ OPTS=$(getopt \
eval set -- "$OPTS"
RUN_DAEMON=0
+RUN_IN_AWS=0
while true; do
case "$1" in
--daemon)
RUN_DAEMON=1
shift
;;
+ --aws)
+ RUN_IN_AWS=1
+ shift
+ ;;
--)
shift
break
@@ -170,6 +175,11 @@ else
LIMIT="/bin/limit3 -c 0 -n 65536"
fi
+## If you are not running in aws cloud, disable this env since https://github.com/aws/aws-sdk-cpp/issues/1410.
+if [ ${RUN_IN_AWS} -eq 0 ]; then
+ export AWS_EC2_METADATA_DISABLED=true
+fi
+
if [ ${RUN_DAEMON} -eq 1 ]; then
nohup $LIMIT ${DORIS_HOME}/lib/doris_be "$@" >> $LOG_DIR/be.out 2>&1 < /dev/null &
else
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index c04017ef7b..e512f33c64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -53,10 +53,15 @@ public class HiveTable extends Table {
public static final String HIVE_TABLE = "table";
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final String HIVE_HDFS_PREFIX = "dfs.";
+ public static final String S3_FS_PREFIX = "fs.s3";
public static final String S3_PROPERTIES_PREFIX = "AWS";
public static final String S3_AK = "AWS_ACCESS_KEY";
public static final String S3_SK = "AWS_SECRET_KEY";
public static final String S3_ENDPOINT = "AWS_ENDPOINT";
+ public static final String AWS_REGION = "AWS_REGION";
+ public static final String AWS_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
+ public static final String AWS_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
+ public static final String AWS_CONN_TIMEOUT_MS = "AWS_CONN_TIMEOUT_MS";
public HiveTable() {
super(TableType.HIVE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
index 1351e29a11..0d3e67c5fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java
@@ -57,16 +57,40 @@ public class DataSourceProperty implements Writable {
Map<String, String> s3Properties = Maps.newHashMap();
if (properties.containsKey(HiveTable.S3_AK)) {
s3Properties.put("fs.s3a.access.key", properties.get(HiveTable.S3_AK));
+ s3Properties.put(HiveTable.S3_AK, properties.get(HiveTable.S3_AK));
}
if (properties.containsKey(HiveTable.S3_SK)) {
s3Properties.put("fs.s3a.secret.key", properties.get(HiveTable.S3_SK));
+ s3Properties.put(HiveTable.S3_SK, properties.get(HiveTable.S3_SK));
}
if (properties.containsKey(HiveTable.S3_ENDPOINT)) {
s3Properties.put("fs.s3a.endpoint", properties.get(HiveTable.S3_ENDPOINT));
+ s3Properties.put(HiveTable.S3_ENDPOINT, properties.get(HiveTable.S3_ENDPOINT));
+ }
+ if (properties.containsKey(HiveTable.AWS_REGION)) {
+ s3Properties.put("fs.s3a.endpoint.region", properties.get(HiveTable.AWS_REGION));
+ s3Properties.put(HiveTable.AWS_REGION, properties.get(HiveTable.AWS_REGION));
+ }
+ if (properties.containsKey(HiveTable.AWS_MAX_CONN_SIZE)) {
+ s3Properties.put("fs.s3a.connection.maximum", properties.get(HiveTable.AWS_MAX_CONN_SIZE));
+ s3Properties.put(HiveTable.AWS_MAX_CONN_SIZE, properties.get(HiveTable.AWS_MAX_CONN_SIZE));
+ }
+ if (properties.containsKey(HiveTable.AWS_REQUEST_TIMEOUT_MS)) {
+ s3Properties.put("fs.s3a.connection.request.timeout", properties.get(HiveTable.AWS_REQUEST_TIMEOUT_MS));
+ s3Properties.put(HiveTable.AWS_REQUEST_TIMEOUT_MS, properties.get(HiveTable.AWS_REQUEST_TIMEOUT_MS));
+ }
+ if (properties.containsKey(HiveTable.AWS_CONN_TIMEOUT_MS)) {
+ s3Properties.put("fs.s3a.connection.timeout", properties.get(HiveTable.AWS_CONN_TIMEOUT_MS));
+ s3Properties.put(HiveTable.AWS_CONN_TIMEOUT_MS, properties.get(HiveTable.AWS_CONN_TIMEOUT_MS));
}
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
s3Properties.put("fs.s3a.attempts.maximum", "2");
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (entry.getKey().startsWith(HiveTable.S3_FS_PREFIX)) {
+ s3Properties.put(entry.getKey(), entry.getValue());
+ }
+ }
return s3Properties;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 78ef63f257..9fbb8fa3fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -74,7 +74,8 @@ import java.util.Random;
import java.util.Set;
/**
- * ExternalFileScanNode for the file access type of datasource, now only support hive,hudi and iceberg.
+ * ExternalFileScanNode for the file access type of datasource, now only support
+ * hive,hudi and iceberg.
*/
public class ExternalFileScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class);
@@ -219,9 +220,11 @@ public class ExternalFileScanNode extends ExternalScanNode {
if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) {
Map<String, String> serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim"))
- ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim");
+ ? HIVE_DEFAULT_COLUMN_SEPARATOR
+ : serDeInfoParams.get("field.delim");
String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim"))
- ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim");
+ ? HIVE_DEFAULT_LINE_DELIMITER
+ : serDeInfoParams.get("line.delim");
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
textParams.setLineDelimiterStr(lineDelimiter);
@@ -246,7 +249,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
slotDescByName.put(column.getName(), slotDesc);
}
- // Hive table must extract partition value from path and hudi/iceberg table keep partition field in file.
+ // Hive table must extract partition value from path and hudi/iceberg table keep
+ // partition field in file.
partitionKeys.addAll(scanProvider.getPathPartitionKeys());
context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
for (SlotDescriptor slot : desc.getSlots()) {
@@ -276,14 +280,14 @@ public class ExternalFileScanNode extends ExternalScanNode {
// If fileFormat is not null, we use fileFormat instead of check file's suffix
private void buildScanRange() throws UserException, IOException {
scanRangeLocations = Lists.newArrayList();
- InputSplit[] inputSplits = scanProvider.getSplits(conjuncts);
- if (0 == inputSplits.length) {
+ List<InputSplit> inputSplits = scanProvider.getSplits(conjuncts);
+ if (inputSplits.isEmpty()) {
return;
}
- inputSplitsNum = inputSplits.length;
+ inputSplitsNum = inputSplits.size();
- String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
- String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
+ String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString();
+ String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath();
String fsName = fullPath.replace(filePath, "");
context.params.setFileType(scanProvider.getTableFileType());
context.params.setFormatType(scanProvider.getTableFormatType());
@@ -292,6 +296,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties());
tHdfsParams.setFsName(fsName);
context.params.setHdfsParams(tHdfsParams);
+ } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) {
+ context.params.setProperties(hmsTable.getS3Properties());
}
TScanRangeLocations curLocations = newLocations(context.params);
@@ -312,6 +318,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
+ fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"
+ " loaction: " + Joiner.on("|").join(split.getLocations()));
+
fileSplitStrategy.update(fileSplit);
// Add a new location when it's can be split
if (fileSplitStrategy.hasNext()) {
@@ -353,10 +360,15 @@ public class ExternalFileScanNode extends ExternalScanNode {
FileSplit fileSplit,
List<String> columnsFromPath) throws DdlException, MetaNotFoundException {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
- rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
rangeDesc.setColumnsFromPath(columnsFromPath);
+
+ if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
+ rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
+ } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) {
+ rangeDesc.setPath(fileSplit.getPath().toString());
+ }
return rangeDesc;
}
@@ -417,4 +429,3 @@ public class ExternalFileScanNode extends ExternalScanNode {
return output.toString();
}
}
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
index ebdd7a768a..36e11d8845 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
@@ -37,11 +37,11 @@ import java.util.Map;
public interface ExternalFileScanProvider {
TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException;
- TFileType getTableFileType();
+ TFileType getTableFileType() throws DdlException, MetaNotFoundException;
String getMetaStoreUrl();
- InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException;
+ List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException;
Table getRemoteHiveTable() throws DdlException, MetaNotFoundException;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
index c60ee3c211..849511b0bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
@@ -27,6 +27,7 @@ import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -39,6 +40,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -69,8 +71,16 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider {
}
@Override
- public TFileType getTableFileType() {
- return TFileType.FILE_HDFS;
+ public TFileType getTableFileType() throws DdlException, MetaNotFoundException {
+ String location = hmsTable.getRemoteTable().getSd().getLocation();
+ if (location != null && !location.isEmpty()) {
+ if (location.startsWith("s3a") || location.startsWith("s3n")) {
+ return TFileType.FILE_S3;
+ } else if (location.startsWith("hdfs:")) {
+ return TFileType.FILE_HDFS;
+ }
+ }
+ throw new DdlException("Unknown file type for hms table.");
}
@Override
@@ -79,34 +89,50 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider {
}
@Override
- public InputSplit[] getSplits(List<Expr> exprs)
+ public List<InputSplit> getSplits(List<Expr> exprs)
throws IOException, UserException {
String splitsPath = getRemoteHiveTable().getSd().getLocation();
List<String> partitionKeys = getRemoteHiveTable().getPartitionKeys()
.stream().map(FieldSchema::getName).collect(Collectors.toList());
+ List<Partition> hivePartitions = new ArrayList<>();
if (partitionKeys.size() > 0) {
ExprNodeGenericFuncDesc hivePartitionPredicate = HiveMetaStoreClientHelper.convertToHivePartitionExpr(
exprs, partitionKeys, hmsTable.getName());
String metaStoreUris = getMetaStoreUrl();
- List<Partition> hivePartitions = HiveMetaStoreClientHelper.getHivePartitions(
- metaStoreUris, getRemoteHiveTable(), hivePartitionPredicate);
- if (!hivePartitions.isEmpty()) {
- splitsPath = hivePartitions.stream().map(x -> x.getSd().getLocation())
- .collect(Collectors.joining(","));
- }
+ hivePartitions.addAll(HiveMetaStoreClientHelper.getHivePartitions(
+ metaStoreUris, getRemoteHiveTable(), hivePartitionPredicate));
}
String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
Configuration configuration = setConfiguration();
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false);
+ if (!hivePartitions.isEmpty()) {
+ return hivePartitions.parallelStream()
+ .flatMap(x -> getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream())
+ .collect(Collectors.toList());
+ } else {
+ return getSplitsByPath(inputFormat, configuration, splitsPath);
+ }
+ }
+
+ private List<InputSplit> getSplitsByPath(
+ InputFormat<?, ?> inputFormat,
+ Configuration configuration,
+ String splitsPath) {
JobConf jobConf = new JobConf(configuration);
FileInputFormat.setInputPaths(jobConf, splitsPath);
- return inputFormat.getSplits(jobConf, 0);
+ try {
+ InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
+ return Lists.newArrayList(splits);
+ } catch (IOException e) {
+ return new ArrayList<InputSplit>();
+ }
}
+
protected Configuration setConfiguration() {
Configuration conf = new Configuration();
Map<String, String> dfsProperties = hmsTable.getDfsProperties();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
index 83ddc86c4d..10fd85b242 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
@@ -24,7 +24,6 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -56,7 +55,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException {
TFileFormatType type;
- String icebergFormat = getRemoteHiveTable().getParameters()
+ String icebergFormat = getRemoteHiveTable().getParameters()
.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
if (icebergFormat.equals("parquet")) {
type = TFileFormatType.FORMAT_PARQUET;
@@ -69,12 +68,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
}
@Override
- public TFileType getTableFileType() {
- return TFileType.FILE_HDFS;
- }
-
- @Override
- public InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException {
+ public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException {
List<Expression> expressions = new ArrayList<>();
for (Expr conjunct : exprs) {
Expression expression = IcebergUtils.convertToIcebergExpr(conjunct);
@@ -88,7 +82,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
for (Expression predicate : expressions) {
scan = scan.filter(predicate);
}
- List<FileSplit> splits = new ArrayList<>();
+ List<InputSplit> splits = new ArrayList<>();
for (FileScanTask task : scan.planFiles()) {
for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
@@ -96,7 +90,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
spitTask.start(), spitTask.length(), new String[0]));
}
}
- return splits.toArray(new InputSplit[0]);
+ return splits;
}
private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index b6d0a3b19a..54f50085a9 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -237,6 +237,8 @@ struct TFileScanRangeParams {
6: optional THdfsParams hdfs_params;
7: optional TFileTextScanRangeParams text_params;
+ // properties for file such as s3 information
+ 8: optional map<string, string> properties;
}
struct TFileRangeDesc {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org