You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/30 01:43:42 UTC
[incubator-doris] branch master updated: [feature](hudi) Step2: Support query hudi external table(include cow and mor table) (#9752)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8092439634 [feature](hudi) Step2: Support query hudi external table(include cow and mor table) (#9752)
8092439634 is described below
commit 8092439634af22f59b73e4029bed9bce770cbd73
Author: dujl <du...@bytedance.com>
AuthorDate: Mon May 30 09:43:36 2022 +0800
[feature](hudi) Step2: Support query hudi external table(include cow and mor table) (#9752)
support query cow and mor hudi table.
---
fe/fe-core/pom.xml | 7 +
.../java/org/apache/doris/analysis/Analyzer.java | 7 +
.../java/org/apache/doris/catalog/Catalog.java | 8 +
.../doris/catalog/HiveMetaStoreClientHelper.java | 45 ++-
.../apache/doris/external/hive/util/HiveUtil.java | 183 ++++++++++
.../org/apache/doris/external/hudi/HudiTable.java | 2 +-
.../org/apache/doris/external/hudi/HudiUtils.java | 55 ++-
.../org/apache/doris/load/BrokerFileGroup.java | 39 ++-
.../org/apache/doris/planner/BrokerScanNode.java | 38 +-
.../org/apache/doris/planner/HiveScanNode.java | 2 +-
.../org/apache/doris/planner/HudiScanNode.java | 381 +++++++++++++++++++++
.../org/apache/doris/planner/IcebergScanNode.java | 5 +-
.../apache/doris/planner/SingleNodePlanner.java | 4 +
.../apache/doris/analysis/CreateTableStmtTest.java | 23 ++
fe/pom.xml | 6 +
15 files changed, 752 insertions(+), 53 deletions(-)
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index b975bdafdd..305e2a4ce6 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -68,6 +68,13 @@ under the License.
<type>pom</type>
<scope>import</scope>
</dependency>
+ <!-- hadoop -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client</artifactId>
+ <version>2.7.4</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
<dependencies>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index cebd197fd5..d858c50b63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -38,6 +38,8 @@ import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.common.VecNotImplException;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.external.hudi.HudiTable;
+import org.apache.doris.external.hudi.HudiUtils;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.qe.ConnectContext;
@@ -672,6 +674,11 @@ public class Analyzer {
}
}
+ if (table.getType() == TableType.HUDI && table.getFullSchema().isEmpty()) {
+ // resolve hudi table's schema when table schema is empty from doris meta
+ table = HudiUtils.resolveHudiTable((HudiTable) table);
+ }
+
// tableName.getTbl() stores the table name specified by the user in the from statement.
// In the case of case-sensitive table names, the value of tableName.getTbl() is the same as table.getName().
// However, since the system view is not case-sensitive, table.getName() gets the lowercase view name,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 3404650378..45dc0ad90d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4158,6 +4158,14 @@ public class Catalog {
if (!hudiTable.getFullSchema().isEmpty()) {
HudiUtils.validateColumns(hudiTable, hiveTable);
}
+ switch (hiveTable.getTableType()) {
+ case "EXTERNAL_TABLE":
+ case "MANAGED_TABLE":
+ break;
+ case "VIRTUAL_VIEW":
+ default:
+ throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "].");
+ }
// check hive table if exists in doris database
if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 5b11708bfd..1f2a651d1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -158,7 +158,7 @@ public class HiveMetaStoreClientHelper {
}
/**
- * Get data files of partitions in hive table, filter by partition predicate
+ * Get data files of partitions in hive table, filter by partition predicate.
* @param hiveTable
* @param hivePartitionPredicate
* @param fileStatuses
@@ -167,22 +167,13 @@ public class HiveMetaStoreClientHelper {
* @throws DdlException
*/
public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate,
- List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl) throws DdlException {
- HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
-
+ List<TBrokerFileStatus> fileStatuses,
+ Table remoteHiveTbl) throws DdlException {
List<RemoteIterator<LocatedFileStatus>> remoteIterators;
if (remoteHiveTbl.getPartitionKeys().size() > 0) {
+ String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
// hive partitioned table, get file iterator from table partition sd info
- List<Partition> hivePartitions = new ArrayList<>();
- try {
- client.listPartitionsByExpr(hiveTable.getHiveDb(), hiveTable.getHiveTable(),
- SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, (short) -1, hivePartitions);
- } catch (TException e) {
- LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
- throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage());
- } finally {
- client.close();
- }
+ List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate);
remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties());
} else {
// hive non-partitioned table, get file iterator from table sd info
@@ -219,6 +210,32 @@ public class HiveMetaStoreClientHelper {
return hdfsUrl;
}
+ /**
+ * list partitions from hiveMetaStore.
+ *
+ * @param metaStoreUris hiveMetaStore uris
+ * @param remoteHiveTbl Hive table
+ * @param hivePartitionPredicate filter when list partitions
+ * @return a list of hive partitions
+ * @throws DdlException when connect hiveMetaStore failed.
+ */
+ public static List<Partition> getHivePartitions(String metaStoreUris, Table remoteHiveTbl,
+ ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException {
+ List<Partition> hivePartitions = new ArrayList<>();
+ HiveMetaStoreClient client = getClient(metaStoreUris);
+ try {
+ client.listPartitionsByExpr(remoteHiveTbl.getDbName(), remoteHiveTbl.getTableName(),
+ SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate),
+ null, (short) -1, hivePartitions);
+ } catch (TException e) {
+ LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
+ throw new DdlException("Connect hive metastore failed.");
+ } finally {
+ client.close();
+ }
+ return hivePartitions;
+ }
+
private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, Map<String, String> properties) throws DdlException {
List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
Configuration configuration = new Configuration(false);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
new file mode 100644
index 0000000000..03d0d5ebec
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.hive.util;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Hive util for create or query hive table.
+ */
+public final class HiveUtil {
+ private static final Logger LOG = LogManager.getLogger(HiveUtil.class);
+
+ private HiveUtil() {
+ }
+
+ /**
+ * get input format class from inputFormatName.
+ *
+ * @param configuration jobConf used when getInputFormatClass
+ * @param inputFormatName inputFormat class name
+ * @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
+ * @return a class of inputFormat.
+ * @throws UserException when class not found.
+ */
+ public static InputFormat<?, ?> getInputFormat(Configuration configuration,
+ String inputFormatName, boolean symlinkTarget) throws UserException {
+ try {
+ JobConf jobConf = new JobConf(configuration);
+
+ Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
+ if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
+ // symlink targets are always TextInputFormat
+ inputFormatClass = TextInputFormat.class;
+ }
+
+ return ReflectionUtils.newInstance(inputFormatClass, jobConf);
+ } catch (ClassNotFoundException | RuntimeException e) {
+ throw new UserException("Unable to create input format " + inputFormatName, e);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "RedundantCast"})
+ private static Class<? extends InputFormat<?, ?>> getInputFormatClass(JobConf conf, String inputFormatName)
+ throws ClassNotFoundException {
+ // CDH uses different names for Parquet
+ if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName)
+ || "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
+ return MapredParquetInputFormat.class;
+ }
+
+ Class<?> clazz = conf.getClassByName(inputFormatName);
+ return (Class<? extends InputFormat<?, ?>>) clazz.asSubclass(InputFormat.class);
+ }
+
+ /**
+ * transform hiveSchema to Doris schema.
+ *
+ * @param hiveSchema hive schema
+ * @return doris schema
+ * @throws AnalysisException when transform failed.
+ */
+ public static List<Column> transformHiveSchema(List<FieldSchema> hiveSchema) throws AnalysisException {
+ List<Column> newSchema = Lists.newArrayList();
+ for (FieldSchema hiveColumn : hiveSchema) {
+ try {
+ newSchema.add(HiveUtil.transformHiveField(hiveColumn));
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}",
+ hiveColumn.getName(), e.getMessage());
+ throw e;
+ }
+ }
+ return newSchema;
+ }
+
+ /**
+ * tranform hive field to doris column.
+ *
+ * @param field hive field to be transformed
+ * @return doris column
+ */
+ public static Column transformHiveField(FieldSchema field) {
+ TypeInfo hiveTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
+ Type type = convertHiveTypeToiveDoris(hiveTypeInfo);
+ return new Column(field.getName(), type, false, null, true, null, field.getComment());
+ }
+
+ private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {
+
+ switch (hiveTypeInfo.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo;
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
+ case VOID:
+ return Type.NULL;
+ case BOOLEAN:
+ return Type.BOOLEAN;
+ case BYTE:
+ return Type.TINYINT;
+ case SHORT:
+ return Type.SMALLINT;
+ case INT:
+ return Type.INT;
+ case LONG:
+ return Type.BIGINT;
+ case FLOAT:
+ return Type.FLOAT;
+ case DOUBLE:
+ return Type.DOUBLE;
+ case STRING:
+ return Type.STRING;
+ case CHAR:
+ return Type.CHAR;
+ case VARCHAR:
+ return Type.VARCHAR;
+ case DATE:
+ return Type.DATE;
+ case TIMESTAMP:
+ return Type.DATETIME;
+ case DECIMAL:
+ return Type.DECIMALV2;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: "
+ + primitiveTypeInfo.getPrimitiveCategory());
+ }
+ }
+ case LIST:
+ TypeInfo elementTypeInfo = ((ListTypeInfo) hiveTypeInfo)
+ .getListElementTypeInfo();
+ Type newType = null;
+ if (elementTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ newType = convertHiveTypeToiveDoris(elementTypeInfo);
+ } else {
+ throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString());
+ }
+ return new ArrayType(newType);
+ case MAP:
+ case STRUCT:
+ case UNION:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString());
+ }
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java
index 0e8e350eb3..cfc491e35e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java
@@ -117,7 +117,7 @@ public class HudiTable extends Table {
thriftHudiTable.setTableName(getHmsTableName());
thriftHudiTable.setProperties(getTableProperties());
- TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.HUDI_TABLE,
+ TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
fullSchema.size(), 0, getName(), "");
thriftTableDescriptor.setHudiTable(thriftHudiTable);
return thriftTableDescriptor;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java
index b1a65ad41d..757fdc2d45 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java
@@ -17,21 +17,31 @@
package org.apache.doris.external.hudi;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.external.hive.util.HiveUtil;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Hudi utils.
*/
public class HudiUtils {
+ private static final Logger LOG = LogManager.getLogger(HudiUtils.class);
private static final String PROPERTY_MISSING_MSG =
"Hudi table %s is null. Please add properties('%s'='xxx') when create table";
@@ -117,12 +127,53 @@ public class HudiUtils {
Set<String> hudiColumnNames = table.getFullSchema().stream()
.map(x -> x.getName()).collect(Collectors.toSet());
- Set<String> hiveTableColumnNames = hiveTable.getSd().getCols()
- .stream().map(x -> x.getName()).collect(Collectors.toSet());
+ Set<String> hiveTableColumnNames =
+ Stream.concat(hiveTable.getSd().getCols().stream(), hiveTable.getPartitionKeys().stream())
+ .map(x -> x.getName()).collect(Collectors.toSet());
hudiColumnNames.removeAll(hiveTableColumnNames);
if (hudiColumnNames.size() > 0) {
throw new DdlException(String.format("Hudi table's column(s): {%s} didn't exist in hive table. ",
String.join(", ", hudiColumnNames)));
}
}
+
+ /**
+ * resolve hudi table from hive metaStore.
+ *
+ * @param table a doris hudi table
+ * @return a doris hudi table which has been resolved.
+ * @throws AnalysisException when remoteTable is not exist or not a hudi table
+ */
+ public static HudiTable resolveHudiTable(HudiTable table) throws AnalysisException {
+ String metastoreUris = table.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
+ org.apache.hadoop.hive.metastore.api.Table remoteHiveTable = null;
+ try {
+ remoteHiveTable = HiveMetaStoreClientHelper.getTable(
+ table.getHmsDatabaseName(),
+ table.getHmsTableName(),
+ metastoreUris);
+ } catch (DdlException e) {
+ LOG.error("Failed to get table from HiveMetaStore", e);
+ throw new AnalysisException(ErrorCode.ERR_UNKNOWN_ERROR.formatErrorMsg());
+ }
+ if (remoteHiveTable == null) {
+ throw new AnalysisException(ErrorCode.ERR_UNKNOWN_TABLE.formatErrorMsg(table.getHmsTableName(),
+ "HiveMetaStore"));
+ }
+ if (!HudiUtils.isHudiTable(remoteHiveTable)) {
+ throw new AnalysisException(ErrorCode.ERR_UNKNOWN_TABLE.formatErrorMsg(table.getHmsTableName(),
+ "HiveMetaStore"));
+ }
+
+ List<Column> newSchema = HiveUtil.transformHiveSchema(remoteHiveTable.getSd().getCols());
+ HudiTable tableWithSchema = new HudiTable(table.getId(),
+ table.getName(),
+ newSchema,
+ table.getTableProperties());
+ return tableWithSchema;
+ }
+
+
+
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 8d0d37643b..ab402d728c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -27,7 +27,6 @@ import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.HiveTable;
-import org.apache.doris.catalog.IcebergTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
@@ -37,7 +36,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.loadv2.LoadTask;
@@ -76,6 +74,7 @@ public class BrokerFileGroup implements Writable {
private List<Long> fileSize;
private List<String> fileFieldNames;
+ // partition columnNames
private List<String> columnsFromPath;
// columnExprList includes all fileFieldNames, columnsFromPath and column mappings
// this param will be recreated by data desc when the log replay
@@ -119,15 +118,26 @@ public class BrokerFileGroup implements Writable {
this.fileFormat = table.getFileFormat();
}
- // Used for hive table, no need to parse
- public BrokerFileGroup(HiveTable table,
- String columnSeparator,
- String lineDelimiter,
+ /**
+ * Should used for hive/iceberg/hudi external table.
+ */
+ public BrokerFileGroup(long tableId,
String filePath,
- String fileFormat,
- List<String> columnsFromPath,
- List<ImportColumnDesc> columnExprList) throws AnalysisException {
- this.tableId = table.getId();
+ String fileFormat) throws AnalysisException {
+ this(tableId, "|", "\n", filePath, fileFormat, null, null);
+ }
+
+ /**
+ * Should used for hive/iceberg/hudi external table.
+ */
+ public BrokerFileGroup(long tableId,
+ String columnSeparator,
+ String lineDelimiter,
+ String filePath,
+ String fileFormat,
+ List<String> columnsFromPath,
+ List<ImportColumnDesc> columnExprList) throws AnalysisException {
+ this.tableId = tableId;
this.valueSeparator = Separator.convertSeparator(columnSeparator);
this.lineDelimiter = Separator.convertSeparator(lineDelimiter);
this.isNegative = false;
@@ -137,15 +147,6 @@ public class BrokerFileGroup implements Writable {
this.columnExprList = columnExprList;
}
- // Used for iceberg table, no need to parse
- public BrokerFileGroup(IcebergTable table) throws UserException {
- this.tableId = table.getId();
- this.isNegative = false;
- this.valueSeparator = "|";
- this.lineDelimiter = "\n";
- this.fileFormat = table.getFileFormat();
- }
-
public BrokerFileGroup(DataDescription dataDescription) {
this.fileFieldNames = dataDescription.getFileFieldNames();
this.columnsFromPath = dataDescription.getColumnsFromPath();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 14cf1164bc..dc4979c2e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -134,7 +134,7 @@ public class BrokerScanNode extends LoadScanNode {
private Analyzer analyzer;
- private static class ParamCreateContext {
+ protected static class ParamCreateContext {
public BrokerFileGroup fileGroup;
public TBrokerScanRangeParams params;
public TupleDescriptor srcTupleDescriptor;
@@ -190,6 +190,10 @@ public class BrokerScanNode extends LoadScanNode {
}
}
+ public List<ParamCreateContext> getParamCreateContexts() {
+ return paramCreateContexts;
+ }
+
protected void initFileGroup() throws UserException {
BrokerTable brokerTable = (BrokerTable) desc.getTable();
try {
@@ -277,13 +281,15 @@ public class BrokerScanNode extends LoadScanNode {
}
}
- Load.initColumns(targetTable, columnDescs,
- context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer,
- context.srcTupleDescriptor, context.slotDescByName, context.params,
- formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized());
+ if (targetTable != null) {
+ Load.initColumns(targetTable, columnDescs,
+ context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer,
+ context.srcTupleDescriptor, context.slotDescByName, context.params,
+ formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized());
+ }
}
- private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc)
+ protected TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc)
throws UserException {
Backend selectedBackend;
@@ -332,10 +338,6 @@ public class BrokerScanNode extends LoadScanNode {
return locations;
}
- private TBrokerScanRange brokerScanRange(TScanRangeLocations locations) {
- return locations.scan_range.broker_scan_range;
- }
-
private void getFileStatusAndCalcInstance() throws UserException {
if (fileStatusesList == null || filesAdded == -1) {
// FIXME(cmy): fileStatusesList and filesAdded can be set out of db lock when doing pull load,
@@ -346,7 +348,10 @@ public class BrokerScanNode extends LoadScanNode {
filesAdded = 0;
this.getFileStatus();
}
- Preconditions.checkState(fileStatusesList.size() == fileGroups.size());
+ // In hudiScanNode, calculate scan range using its own way which do not need fileStatusesList
+ if (!(this instanceof HudiScanNode)) {
+ Preconditions.checkState(fileStatusesList.size() == fileGroups.size());
+ }
if (isLoad() && filesAdded == 0) {
throw new UserException("No source file in this table(" + targetTable.getName() + ").");
@@ -524,7 +529,7 @@ public class BrokerScanNode extends LoadScanNode {
rangeDesc.setNumAsString(context.fileGroup.isNumAsString());
rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine());
}
- brokerScanRange(curLocations).addToRanges(rangeDesc);
+ curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;
} else {
@@ -537,7 +542,7 @@ public class BrokerScanNode extends LoadScanNode {
}
rangeDesc.setReadByColumnDef(true);
- brokerScanRange(curLocations).addToRanges(rangeDesc);
+ curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
curFileOffset = 0;
i++;
}
@@ -565,7 +570,7 @@ public class BrokerScanNode extends LoadScanNode {
}
rangeDesc.setReadByColumnDef(true);
- brokerScanRange(curLocations).addToRanges(rangeDesc);
+ curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
curFileOffset = 0;
curInstanceBytes += leftBytes;
i++;
@@ -573,7 +578,7 @@ public class BrokerScanNode extends LoadScanNode {
}
// Put the last file
- if (brokerScanRange(curLocations).isSetRanges()) {
+ if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
locationsList.add(curLocations);
}
}
@@ -589,7 +594,10 @@ public class BrokerScanNode extends LoadScanNode {
rangeDesc.setSplittable(fileStatus.isSplitable);
rangeDesc.setStartOffset(curFileOffset);
rangeDesc.setSize(rangeBytes);
+ // fileSize only be used when format is orc or parquet and TFileType is broker
+ // When TFileType is other type, it is not necessary
rangeDesc.setFileSize(fileStatus.size);
+ // In Backend, will append columnsFromPath to the end of row after data scanned from file.
rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setHeaderType(headerType);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index c22b4842e2..ee9b1603bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -115,7 +115,7 @@ public class HiveScanNode extends BrokerScanNode {
HiveTable hiveTable = (HiveTable) desc.getTable();
fileGroups = Lists.newArrayList(
- new BrokerFileGroup(hiveTable,
+ new BrokerFileGroup(hiveTable.getId(),
getColumnSeparator(),
getLineDelimiter(),
getPath(),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
new file mode 100644
index 0000000000..756441f396
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ImportColumnDesc;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.external.hudi.HudiProperty;
+import org.apache.doris.external.hudi.HudiTable;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TBrokerScanRangeParams;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.THdfsParams;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.mortbay.log.Log;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Hudi scan node to query hudi table.
+ */
+public class HudiScanNode extends BrokerScanNode {
+ private static final Logger LOG = LogManager.getLogger(HudiScanNode.class);
+
+ private HudiTable hudiTable;
+ // partition column predicates of hive table
+ private List<ExprNodeDesc> hivePredicates = new ArrayList<>();
+ private ExprNodeGenericFuncDesc hivePartitionPredicate;
+ private List<ImportColumnDesc> parsedColumnExprList = new ArrayList<>();
+ private String hdfsUri;
+
+ private Table remoteHiveTable;
+
+ /* hudi table properties */
+ private String fileFormat;
+ private String inputFormatName;
+ private String basePath;
+ private List<String> partitionKeys = new ArrayList<>();
+ /* hudi table properties */
+
+ private List<TScanRangeLocations> scanRangeLocations;
+
+ public HudiScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
+ List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
+ super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded);
+ this.hudiTable = (HudiTable) destTupleDesc.getTable();
+ }
+
+ public String getHdfsUri() {
+ return hdfsUri;
+ }
+
+ public List<ImportColumnDesc> getParsedColumnExprList() {
+ return parsedColumnExprList;
+ }
+
+ public String getFileFormat() {
+ return fileFormat;
+ }
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ public List<String> getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ /**
+ * super init will invoke initFileGroup, In initFileGroup will do
+ * 1, get hudi table from hive metastore
+ * 2, resolve hudi table type, query mode, table base path, partition columns information.
+ * 3. generate fileGroup
+ *
+ * @param analyzer analyzer
+ * @throws UserException when init failed.
+ */
+ @Override
+ public void init(Analyzer analyzer) throws UserException {
+ super.init(analyzer);
+ // init scan range params
+ initParams(analyzer);
+ }
+
+ @Override
+ public int getNumInstances() {
+ return scanRangeLocations.size();
+ }
+
+ @Override
+ protected void initFileGroup() throws UserException {
+ resolvHiveTable();
+ analyzeColumnFromPath();
+
+ HudiTable hudiTable = (HudiTable) desc.getTable();
+ fileGroups = Lists.newArrayList(
+ new BrokerFileGroup(hudiTable.getId(),
+ "\t",
+ "\n",
+ getBasePath(),
+ getFileFormat(),
+ getPartitionKeys(),
+ getParsedColumnExprList()));
+ brokerDesc = new BrokerDesc("HudiTableDesc", StorageBackend.StorageType.HDFS, hudiTable.getTableProperties());
+
+ }
+
+ /**
+ * Override this function just for skip parent's getFileStatus.
+ */
+ @Override
+ protected void getFileStatus() throws DdlException {
+ if (partitionKeys.size() > 0) {
+ extractHivePartitionPredicate();
+ }
+ // set fileStatusesList as empty, we do not need fileStatusesList
+ fileStatusesList = Lists.newArrayList();
+ filesAdded = 0;
+ }
+
+ @Override
+ public void finalize(Analyzer analyzer) throws UserException {
+ try {
+ ParamCreateContext context = getParamCreateContexts().get(0);
+ finalizeParams(context.slotDescByName, context.exprMap, context.params,
+ context.srcTupleDescriptor, false, context.fileGroup.isNegative(), analyzer);
+ } catch (AnalysisException e) {
+ throw new UserException(e.getMessage());
+ }
+ try {
+ buildScanRange();
+ } catch (IOException e) {
+ LOG.error("Build scan range failed.", e);
+ throw new UserException("Build scan range failed.", e);
+ }
+ }
+
+ @Override
+ public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
+ return scanRangeLocations;
+ }
+
+ private void resolvHiveTable() throws DdlException {
+ this.remoteHiveTable = HiveMetaStoreClientHelper.getTable(
+ hudiTable.getHmsDatabaseName(),
+ hudiTable.getHmsTableName(),
+ hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS));
+
+ this.inputFormatName = remoteHiveTable.getSd().getInputFormat();
+ this.fileFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(this.inputFormatName);
+ this.basePath = remoteHiveTable.getSd().getLocation();
+ for (FieldSchema fieldSchema : remoteHiveTable.getPartitionKeys()) {
+ this.partitionKeys.add(fieldSchema.getName());
+ }
+ Log.info("Hudi inputFileFormat is " + inputFormatName + ", basePath is " + this.basePath);
+ }
+
+ private void initParams(Analyzer analyzer) {
+ ParamCreateContext context = getParamCreateContexts().get(0);
+ TBrokerScanRangeParams params = context.params;
+
+ Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
+
+ List<Column> columns = hudiTable.getBaseSchema(false);
+ // init slot desc add expr map, also transform hadoop functions
+ for (Column column : columns) {
+ SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor);
+ slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+ slotDesc.setIsMaterialized(true);
+ slotDesc.setIsNullable(true);
+ slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
+ params.addToSrcSlotIds(slotDesc.getId().asInt());
+ slotDescByName.put(column.getName(), slotDesc);
+ }
+ context.slotDescByName = slotDescByName;
+ }
+
+
+ /**
+ * Extracts partition predicate from SelectStmt.whereClause that can be pushed down to Hive.
+ */
+ private void extractHivePartitionPredicate() throws DdlException {
+ ListIterator<Expr> it = conjuncts.listIterator();
+ while (it.hasNext()) {
+ ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr(
+ it.next(), partitionKeys, hudiTable.getName());
+ if (hiveExpr != null) {
+ hivePredicates.add(hiveExpr);
+ }
+ }
+ int count = hivePredicates.size();
+ // combine all predicate by `and`
+ // compoundExprs must have at least 2 predicates
+ if (count >= 2) {
+ hivePartitionPredicate = HiveMetaStoreClientHelper.getCompoundExpr(hivePredicates, "and");
+ } else if (count == 1) {
+ // only one predicate
+ hivePartitionPredicate = (ExprNodeGenericFuncDesc) hivePredicates.get(0);
+ } else {
+ // have no predicate, make a dummy predicate "1=1" to get all partitions
+ HiveMetaStoreClientHelper.ExprBuilder exprBuilder =
+ new HiveMetaStoreClientHelper.ExprBuilder(hudiTable.getName());
+ hivePartitionPredicate = exprBuilder.val(TypeInfoFactory.intTypeInfo, 1)
+ .val(TypeInfoFactory.intTypeInfo, 1)
+ .pred("=", 2).build();
+ }
+ }
+
+ private InputSplit[] getSplits() throws UserException, IOException {
+ String splitsPath = basePath;
+ if (partitionKeys.size() > 0) {
+ extractHivePartitionPredicate();
+
+ String metaStoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
+ List<Partition> hivePartitions =
+ HiveMetaStoreClientHelper.getHivePartitions(metaStoreUris, remoteHiveTable, hivePartitionPredicate);
+ splitsPath = hivePartitions.stream()
+ .map(x -> x.getSd().getLocation()).collect(Collectors.joining(","));
+ }
+
+
+ Configuration configuration = new Configuration();
+ InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false);
+ // alway get fileSplits from inputformat,
+ // because all hoodie input format have UseFileSplitsFromInputFormat annotation
+ JobConf jobConf = new JobConf(configuration);
+ FileInputFormat.setInputPaths(jobConf, splitsPath);
+ InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0);
+ return inputSplits;
+
+ }
+
+ // If fileFormat is not null, we use fileFormat instead of check file's suffix
+ protected void buildScanRange() throws UserException, IOException {
+ scanRangeLocations = Lists.newArrayList();
+ InputSplit[] inputSplits = getSplits();
+ if (inputSplits.length == 0) {
+ return;
+ }
+
+ THdfsParams hdfsParams = new THdfsParams();
+ String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
+ String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
+ String fsName = fullPath.replace(filePath, "");
+ hdfsParams.setFsName(fsName);
+ Log.debug("Hudi path's host is " + fsName);
+
+ TFileFormatType formatType = null;
+ if (this.inputFormatName.toUpperCase(Locale.ROOT).contains("parquet")) {
+ formatType = TFileFormatType.FORMAT_PARQUET;
+ } else if (this.inputFormatName.toUpperCase(Locale.ROOT).contains("orc")) {
+ formatType = TFileFormatType.FORMAT_ORC;
+ } else {
+ throw new UserException("unsupported hudi table type [" + this.inputFormatName + "].");
+ }
+
+ ParamCreateContext context = getParamCreateContexts().get(0);
+ for (InputSplit split : inputSplits) {
+ FileSplit fileSplit = (FileSplit) split;
+
+ TScanRangeLocations curLocations = newLocations(context.params, brokerDesc);
+ List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
+ getPartitionKeys());
+ int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size();
+
+ TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, formatType,
+ partitionValuesFromPath, numberOfColumnsFromFile, brokerDesc);
+ rangeDesc.setHdfsParams(hdfsParams);
+ rangeDesc.setReadByColumnDef(true);
+
+ curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
+ Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId()
+ + " with hudi split: " + fileSplit.getPath()
+ + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")");
+
+ // Put the last file
+ if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
+ scanRangeLocations.add(curLocations);
+ }
+ }
+ }
+
+ private TBrokerRangeDesc createBrokerRangeDesc(FileSplit fileSplit,
+ TFileFormatType formatType,
+ List<String> columnsFromPath, int numberOfColumnsFromFile,
+ BrokerDesc brokerDesc) {
+ TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
+ rangeDesc.setFileType(brokerDesc.getFileType());
+ rangeDesc.setFormatType(formatType);
+ rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
+ rangeDesc.setSplittable(true);
+ rangeDesc.setStartOffset(fileSplit.getStart());
+ rangeDesc.setSize(fileSplit.getLength());
+ rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
+ rangeDesc.setColumnsFromPath(columnsFromPath);
+ // set hdfs params for hdfs file type.
+ switch (brokerDesc.getFileType()) {
+ case FILE_HDFS:
+ BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc);
+ break;
+ default:
+ break;
+ }
+ return rangeDesc;
+ }
+
+ @Override
+ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
+ StringBuilder output = new StringBuilder();
+ if (!isLoad()) {
+ output.append(prefix).append("TABLE: ").append(hudiTable.getName()).append("\n");
+ output.append(prefix).append("PATH: ")
+ .append(hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS)).append("\n");
+ }
+ return output.toString();
+ }
+
+ /**
+ * Analyze columns from path, the partition columns.
+ */
+ private void analyzeColumnFromPath() {
+ for (String colName : partitionKeys) {
+ ImportColumnDesc importColumnDesc = new ImportColumnDesc(colName, null);
+ parsedColumnExprList.add(importColumnDesc);
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
index 6bdda5272e..4af73caf51 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
@@ -57,7 +57,10 @@ public class IcebergScanNode extends BrokerScanNode {
@Override
protected void initFileGroup() throws UserException {
- fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable));
+ fileGroups = Lists.newArrayList(
+ new BrokerFileGroup(icebergTable.getId(),
+ null,
+ icebergTable.getFileFormat()));
brokerDesc = new BrokerDesc("IcebergTableDesc", icebergTable.getStorageType(),
icebergTable.getIcebergProperties());
targetTable = icebergTable;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index b6aec38c8e..acf6702f70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1708,6 +1708,10 @@ public class SingleNodePlanner {
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode",
null, -1);
break;
+ case HUDI:
+ scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode",
+ null, -1);
+ break;
default:
break;
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
index 83c85251ae..1400d8cdd7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
@@ -291,4 +291,27 @@ public class CreateTableStmtTest {
+ "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n"
+ "\"hudi.table\" = \"test\")", stmt.toString());
}
+
+ @Test
+ public void testCreateHudiTableWithSchema() throws UserException {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("hudi.database", "doris");
+ properties.put("hudi.table", "test");
+ properties.put("hudi.hive.metastore.uris", "thrift://127.0.0.1:9087");
+ CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, "");
+ ColumnDef idCol = new ColumnDef("id", TypeDef.create(PrimitiveType.INT));
+ stmt.addColumnDef(idCol);
+ ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.INT), false,
+ null, true, ColumnDef.DefaultValue.NOT_SET, "");
+ stmt.addColumnDef(nameCol);
+ stmt.analyze(analyzer);
+
+ Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n"
+ + " `id` int(11) NOT NULL COMMENT \"\",\n"
+ + " `name` int(11) NULL COMMENT \"\"\n"
+ + ") ENGINE = hudi\n"
+ + "PROPERTIES (\"hudi.database\" = \"doris\",\n"
+ + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n"
+ + "\"hudi.table\" = \"test\")", stmt.toString());
+ }
}
diff --git a/fe/pom.xml b/fe/pom.xml
index 40406208cf..7c1045346e 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -323,6 +323,12 @@ under the License.
</profiles>
<dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.8.0</version>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>fe-common</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org