You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/30 01:43:42 UTC

[incubator-doris] branch master updated: [feature](hudi) Step2: Support query hudi external table(include cow and mor table) (#9752)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8092439634 [feature](hudi) Step2: Support query hudi external table(include cow and mor table) (#9752)
8092439634 is described below

commit 8092439634af22f59b73e4029bed9bce770cbd73
Author: dujl <du...@bytedance.com>
AuthorDate: Mon May 30 09:43:36 2022 +0800

    [feature](hudi) Step2: Support query hudi external table(include cow and mor table) (#9752)
    
    support query cow and mor hudi table.
---
 fe/fe-core/pom.xml                                 |   7 +
 .../java/org/apache/doris/analysis/Analyzer.java   |   7 +
 .../java/org/apache/doris/catalog/Catalog.java     |   8 +
 .../doris/catalog/HiveMetaStoreClientHelper.java   |  45 ++-
 .../apache/doris/external/hive/util/HiveUtil.java  | 183 ++++++++++
 .../org/apache/doris/external/hudi/HudiTable.java  |   2 +-
 .../org/apache/doris/external/hudi/HudiUtils.java  |  55 ++-
 .../org/apache/doris/load/BrokerFileGroup.java     |  39 ++-
 .../org/apache/doris/planner/BrokerScanNode.java   |  38 +-
 .../org/apache/doris/planner/HiveScanNode.java     |   2 +-
 .../org/apache/doris/planner/HudiScanNode.java     | 381 +++++++++++++++++++++
 .../org/apache/doris/planner/IcebergScanNode.java  |   5 +-
 .../apache/doris/planner/SingleNodePlanner.java    |   4 +
 .../apache/doris/analysis/CreateTableStmtTest.java |  23 ++
 fe/pom.xml                                         |   6 +
 15 files changed, 752 insertions(+), 53 deletions(-)

diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index b975bdafdd..305e2a4ce6 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -68,6 +68,13 @@ under the License.
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
+            <!-- hadoop -->
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-mapreduce-client</artifactId>
+                <version>2.7.4</version>
+                <scope>compile</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <dependencies>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index cebd197fd5..d858c50b63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -38,6 +38,8 @@ import org.apache.doris.common.IdGenerator;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.VecNotImplException;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.external.hudi.HudiTable;
+import org.apache.doris.external.hudi.HudiUtils;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.RuntimeFilter;
 import org.apache.doris.qe.ConnectContext;
@@ -672,6 +674,11 @@ public class Analyzer {
             }
         }
 
+        if (table.getType() == TableType.HUDI && table.getFullSchema().isEmpty()) {
+            // resolve hudi table's schema when table schema is empty from doris meta
+            table = HudiUtils.resolveHudiTable((HudiTable) table);
+        }
+
         // tableName.getTbl() stores the table name specified by the user in the from statement.
         // In the case of case-sensitive table names, the value of tableName.getTbl() is the same as table.getName().
         // However, since the system view is not case-sensitive, table.getName() gets the lowercase view name,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 3404650378..45dc0ad90d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4158,6 +4158,14 @@ public class Catalog {
         if (!hudiTable.getFullSchema().isEmpty()) {
             HudiUtils.validateColumns(hudiTable, hiveTable);
         }
+        switch (hiveTable.getTableType()) {
+            case "EXTERNAL_TABLE":
+            case "MANAGED_TABLE":
+                break;
+            case "VIRTUAL_VIEW":
+            default:
+                throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "].");
+        }
         // check hive table if exists in doris database
         if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) {
             ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 5b11708bfd..1f2a651d1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -158,7 +158,7 @@ public class HiveMetaStoreClientHelper {
     }
 
     /**
-     * Get data files of partitions in hive table, filter by partition predicate
+     * Get data files of partitions in hive table, filter by partition predicate.
      * @param hiveTable
      * @param hivePartitionPredicate
      * @param fileStatuses
@@ -167,22 +167,13 @@ public class HiveMetaStoreClientHelper {
      * @throws DdlException
      */
     public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate,
-                                          List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl) throws DdlException {
-        HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
-
+                                          List<TBrokerFileStatus> fileStatuses,
+                                          Table remoteHiveTbl) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> remoteIterators;
         if (remoteHiveTbl.getPartitionKeys().size() > 0) {
+            String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
             // hive partitioned table, get file iterator from table partition sd info
-            List<Partition> hivePartitions = new ArrayList<>();
-            try {
-                client.listPartitionsByExpr(hiveTable.getHiveDb(), hiveTable.getHiveTable(),
-                        SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, (short) -1, hivePartitions);
-            } catch (TException e) {
-                LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
-                throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage());
-            } finally {
-                client.close();
-            }
+            List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate);
             remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties());
         } else {
             // hive non-partitioned table, get file iterator from table sd info
@@ -219,6 +210,32 @@ public class HiveMetaStoreClientHelper {
         return hdfsUrl;
     }
 
+    /**
+     * list partitions from hiveMetaStore.
+     *
+     * @param metaStoreUris hiveMetaStore uris
+     * @param remoteHiveTbl Hive table
+     * @param hivePartitionPredicate filter when list partitions
+     * @return a list of hive partitions
+     * @throws DdlException when connect hiveMetaStore failed.
+     */
+    public static List<Partition> getHivePartitions(String metaStoreUris, Table remoteHiveTbl,
+                       ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException {
+        List<Partition> hivePartitions = new ArrayList<>();
+        HiveMetaStoreClient client = getClient(metaStoreUris);
+        try {
+            client.listPartitionsByExpr(remoteHiveTbl.getDbName(), remoteHiveTbl.getTableName(),
+                    SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate),
+                    null, (short) -1, hivePartitions);
+        } catch (TException e) {
+            LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
+            throw new DdlException("Connect hive metastore failed.");
+        } finally {
+            client.close();
+        }
+        return hivePartitions;
+    }
+
     private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, Map<String, String> properties) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
new file mode 100644
index 0000000000..03d0d5ebec
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.hive.util;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Hive util for create or query hive table.
+ */
+public final class HiveUtil {
+    private static final Logger LOG = LogManager.getLogger(HiveUtil.class);
+
+    private HiveUtil() {
+    }
+
+    /**
+     * get input format class from inputFormatName.
+     *
+     * @param configuration jobConf used when getInputFormatClass
+     * @param inputFormatName inputFormat class name
+     * @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
+     * @return a class of inputFormat.
+     * @throws UserException  when class not found.
+     */
+    public static InputFormat<?, ?> getInputFormat(Configuration configuration,
+                                                   String inputFormatName, boolean symlinkTarget) throws UserException {
+        try {
+            JobConf jobConf = new JobConf(configuration);
+
+            Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
+            if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
+                // symlink targets are always TextInputFormat
+                inputFormatClass = TextInputFormat.class;
+            }
+
+            return ReflectionUtils.newInstance(inputFormatClass, jobConf);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            throw new UserException("Unable to create input format " + inputFormatName, e);
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "RedundantCast"})
+    private static Class<? extends InputFormat<?, ?>> getInputFormatClass(JobConf conf, String inputFormatName)
+            throws ClassNotFoundException {
+        // CDH uses different names for Parquet
+        if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName)
+                || "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
+            return MapredParquetInputFormat.class;
+        }
+
+        Class<?> clazz = conf.getClassByName(inputFormatName);
+        return (Class<? extends InputFormat<?, ?>>) clazz.asSubclass(InputFormat.class);
+    }
+
+    /**
+     * transform hiveSchema to Doris schema.
+     *
+     * @param hiveSchema hive schema
+     * @return doris schema
+     * @throws AnalysisException when transform failed.
+     */
+    public static List<Column> transformHiveSchema(List<FieldSchema> hiveSchema) throws AnalysisException {
+        List<Column> newSchema = Lists.newArrayList();
+        for (FieldSchema hiveColumn : hiveSchema) {
+            try {
+                newSchema.add(HiveUtil.transformHiveField(hiveColumn));
+            } catch (UnsupportedOperationException e) {
+                LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}",
+                        hiveColumn.getName(), e.getMessage());
+                throw e;
+            }
+        }
+        return newSchema;
+    }
+
+    /**
+     * tranform hive field to doris column.
+     *
+     * @param field hive field to be transformed
+     * @return doris column
+     */
+    public static Column transformHiveField(FieldSchema field) {
+        TypeInfo hiveTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
+        Type type = convertHiveTypeToiveDoris(hiveTypeInfo);
+        return new Column(field.getName(), type, false, null, true, null, field.getComment());
+    }
+
+    private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {
+
+        switch (hiveTypeInfo.getCategory()) {
+            case PRIMITIVE: {
+                PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo;
+                switch (primitiveTypeInfo.getPrimitiveCategory()) {
+                    case VOID:
+                        return Type.NULL;
+                    case BOOLEAN:
+                        return Type.BOOLEAN;
+                    case BYTE:
+                        return Type.TINYINT;
+                    case SHORT:
+                        return Type.SMALLINT;
+                    case INT:
+                        return Type.INT;
+                    case LONG:
+                        return Type.BIGINT;
+                    case FLOAT:
+                        return Type.FLOAT;
+                    case DOUBLE:
+                        return Type.DOUBLE;
+                    case STRING:
+                        return Type.STRING;
+                    case CHAR:
+                        return Type.CHAR;
+                    case VARCHAR:
+                        return Type.VARCHAR;
+                    case DATE:
+                        return Type.DATE;
+                    case TIMESTAMP:
+                        return Type.DATETIME;
+                    case DECIMAL:
+                        return Type.DECIMALV2;
+                    default:
+                        throw new UnsupportedOperationException("Unsupported type: "
+                            + primitiveTypeInfo.getPrimitiveCategory());
+                }
+            }
+            case LIST:
+                TypeInfo elementTypeInfo = ((ListTypeInfo) hiveTypeInfo)
+                        .getListElementTypeInfo();
+                Type newType = null;
+                if (elementTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+                    newType = convertHiveTypeToiveDoris(elementTypeInfo);
+                } else {
+                    throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString());
+                }
+                return new ArrayType(newType);
+            case MAP:
+            case STRUCT:
+            case UNION:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString());
+        }
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java
index 0e8e350eb3..cfc491e35e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java
@@ -117,7 +117,7 @@ public class HudiTable extends Table {
         thriftHudiTable.setTableName(getHmsTableName());
         thriftHudiTable.setProperties(getTableProperties());
 
-        TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.HUDI_TABLE,
+        TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
                 fullSchema.size(), 0, getName(), "");
         thriftTableDescriptor.setHudiTable(thriftHudiTable);
         return thriftTableDescriptor;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java
index b1a65ad41d..757fdc2d45 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java
@@ -17,21 +17,31 @@
 
 package org.apache.doris.external.hudi;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.external.hive.util.HiveUtil;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Hudi utils.
  */
 public class HudiUtils {
+    private static final Logger LOG = LogManager.getLogger(HudiUtils.class);
 
     private static final String PROPERTY_MISSING_MSG =
             "Hudi table %s is null. Please add properties('%s'='xxx') when create table";
@@ -117,12 +127,53 @@ public class HudiUtils {
         Set<String> hudiColumnNames = table.getFullSchema().stream()
                 .map(x -> x.getName()).collect(Collectors.toSet());
 
-        Set<String> hiveTableColumnNames = hiveTable.getSd().getCols()
-                .stream().map(x -> x.getName()).collect(Collectors.toSet());
+        Set<String> hiveTableColumnNames =
+                Stream.concat(hiveTable.getSd().getCols().stream(), hiveTable.getPartitionKeys().stream())
+                .map(x -> x.getName()).collect(Collectors.toSet());
         hudiColumnNames.removeAll(hiveTableColumnNames);
         if (hudiColumnNames.size() > 0) {
             throw new DdlException(String.format("Hudi table's column(s): {%s} didn't exist in hive table. ",
                     String.join(", ", hudiColumnNames)));
         }
     }
+
+    /**
+     * resolve hudi table from hive metaStore.
+     *
+     * @param table a doris hudi table
+     * @return a doris hudi table which has been resolved.
+     * @throws AnalysisException when remoteTable is not exist or not a hudi table
+     */
+    public static HudiTable resolveHudiTable(HudiTable table) throws AnalysisException {
+        String metastoreUris = table.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
+        org.apache.hadoop.hive.metastore.api.Table remoteHiveTable = null;
+        try {
+            remoteHiveTable = HiveMetaStoreClientHelper.getTable(
+                    table.getHmsDatabaseName(),
+                    table.getHmsTableName(),
+                    metastoreUris);
+        } catch (DdlException e) {
+            LOG.error("Failed to get table from HiveMetaStore", e);
+            throw new AnalysisException(ErrorCode.ERR_UNKNOWN_ERROR.formatErrorMsg());
+        }
+        if (remoteHiveTable == null) {
+            throw new AnalysisException(ErrorCode.ERR_UNKNOWN_TABLE.formatErrorMsg(table.getHmsTableName(),
+                    "HiveMetaStore"));
+        }
+        if (!HudiUtils.isHudiTable(remoteHiveTable)) {
+            throw new AnalysisException(ErrorCode.ERR_UNKNOWN_TABLE.formatErrorMsg(table.getHmsTableName(),
+                    "HiveMetaStore"));
+        }
+
+        List<Column> newSchema = HiveUtil.transformHiveSchema(remoteHiveTable.getSd().getCols());
+        HudiTable tableWithSchema =  new HudiTable(table.getId(),
+                table.getName(),
+                newSchema,
+                table.getTableProperties());
+        return tableWithSchema;
+    }
+
+
+
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 8d0d37643b..ab402d728c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -27,7 +27,6 @@ import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.HiveTable;
-import org.apache.doris.catalog.IcebergTable;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.OlapTable.OlapTableState;
@@ -37,7 +36,6 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.load.loadv2.LoadTask;
@@ -76,6 +74,7 @@ public class BrokerFileGroup implements Writable {
     private List<Long> fileSize;
 
     private List<String> fileFieldNames;
+    // partition columnNames
     private List<String> columnsFromPath;
     // columnExprList includes all fileFieldNames, columnsFromPath and column mappings
     // this param will be recreated by data desc when the log replay
@@ -119,15 +118,26 @@ public class BrokerFileGroup implements Writable {
         this.fileFormat = table.getFileFormat();
     }
 
-    // Used for hive table, no need to parse
-    public BrokerFileGroup(HiveTable table,
-                           String columnSeparator,
-                           String lineDelimiter,
+    /**
+     * Should used for hive/iceberg/hudi external table.
+     */
+    public BrokerFileGroup(long tableId,
                            String filePath,
-                           String fileFormat,
-                           List<String> columnsFromPath,
-                           List<ImportColumnDesc> columnExprList) throws AnalysisException {
-        this.tableId = table.getId();
+                           String fileFormat) throws AnalysisException {
+        this(tableId,  "|", "\n", filePath, fileFormat, null, null);
+    }
+
+    /**
+     * Should used for hive/iceberg/hudi external table.
+     */
+    public BrokerFileGroup(long tableId,
+            String columnSeparator,
+            String lineDelimiter,
+            String filePath,
+            String fileFormat,
+            List<String> columnsFromPath,
+            List<ImportColumnDesc> columnExprList) throws AnalysisException {
+        this.tableId = tableId;
         this.valueSeparator = Separator.convertSeparator(columnSeparator);
         this.lineDelimiter = Separator.convertSeparator(lineDelimiter);
         this.isNegative = false;
@@ -137,15 +147,6 @@ public class BrokerFileGroup implements Writable {
         this.columnExprList = columnExprList;
     }
 
-    // Used for iceberg table, no need to parse
-    public BrokerFileGroup(IcebergTable table) throws UserException {
-        this.tableId = table.getId();
-        this.isNegative = false;
-        this.valueSeparator = "|";
-        this.lineDelimiter = "\n";
-        this.fileFormat = table.getFileFormat();
-    }
-
     public BrokerFileGroup(DataDescription dataDescription) {
         this.fileFieldNames = dataDescription.getFileFieldNames();
         this.columnsFromPath = dataDescription.getColumnsFromPath();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 14cf1164bc..dc4979c2e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -134,7 +134,7 @@ public class BrokerScanNode extends LoadScanNode {
 
     private Analyzer analyzer;
 
-    private static class ParamCreateContext {
+    protected static class ParamCreateContext {
         public BrokerFileGroup fileGroup;
         public TBrokerScanRangeParams params;
         public TupleDescriptor srcTupleDescriptor;
@@ -190,6 +190,10 @@ public class BrokerScanNode extends LoadScanNode {
         }
     }
 
+    public List<ParamCreateContext> getParamCreateContexts() {
+        return paramCreateContexts;
+    }
+
     protected void initFileGroup() throws UserException {
         BrokerTable brokerTable = (BrokerTable) desc.getTable();
         try {
@@ -277,13 +281,15 @@ public class BrokerScanNode extends LoadScanNode {
             }
         }
 
-        Load.initColumns(targetTable, columnDescs,
-                context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer,
-                context.srcTupleDescriptor, context.slotDescByName, context.params,
-                formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized());
+        if (targetTable != null) {
+            Load.initColumns(targetTable, columnDescs,
+                    context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer,
+                    context.srcTupleDescriptor, context.slotDescByName, context.params,
+                    formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized());
+        }
     }
 
-    private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc)
+    protected TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc)
             throws UserException {
 
         Backend selectedBackend;
@@ -332,10 +338,6 @@ public class BrokerScanNode extends LoadScanNode {
         return locations;
     }
 
-    private TBrokerScanRange brokerScanRange(TScanRangeLocations locations) {
-        return locations.scan_range.broker_scan_range;
-    }
-
     private void getFileStatusAndCalcInstance() throws UserException {
         if (fileStatusesList == null || filesAdded == -1) {
             // FIXME(cmy): fileStatusesList and filesAdded can be set out of db lock when doing pull load,
@@ -346,7 +348,10 @@ public class BrokerScanNode extends LoadScanNode {
             filesAdded = 0;
             this.getFileStatus();
         }
-        Preconditions.checkState(fileStatusesList.size() == fileGroups.size());
+        // In hudiScanNode, calculate scan range using its own way which do not need fileStatusesList
+        if (!(this instanceof HudiScanNode)) {
+            Preconditions.checkState(fileStatusesList.size() == fileGroups.size());
+        }
 
         if (isLoad() && filesAdded == 0) {
             throw new UserException("No source file in this table(" + targetTable.getName() + ").");
@@ -524,7 +529,7 @@ public class BrokerScanNode extends LoadScanNode {
                         rangeDesc.setNumAsString(context.fileGroup.isNumAsString());
                         rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine());
                     }
-                    brokerScanRange(curLocations).addToRanges(rangeDesc);
+                    curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
                     curFileOffset += rangeBytes;
 
                 } else {
@@ -537,7 +542,7 @@ public class BrokerScanNode extends LoadScanNode {
                     }
 
                     rangeDesc.setReadByColumnDef(true);
-                    brokerScanRange(curLocations).addToRanges(rangeDesc);
+                    curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
                     curFileOffset = 0;
                     i++;
                 }
@@ -565,7 +570,7 @@ public class BrokerScanNode extends LoadScanNode {
                 }
 
                 rangeDesc.setReadByColumnDef(true);
-                brokerScanRange(curLocations).addToRanges(rangeDesc);
+                curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
                 curFileOffset = 0;
                 curInstanceBytes += leftBytes;
                 i++;
@@ -573,7 +578,7 @@ public class BrokerScanNode extends LoadScanNode {
         }
 
         // Put the last file
-        if (brokerScanRange(curLocations).isSetRanges()) {
+        if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
             locationsList.add(curLocations);
         }
     }
@@ -589,7 +594,10 @@ public class BrokerScanNode extends LoadScanNode {
         rangeDesc.setSplittable(fileStatus.isSplitable);
         rangeDesc.setStartOffset(curFileOffset);
         rangeDesc.setSize(rangeBytes);
+        // fileSize only be used when format is orc or parquet and TFileType is broker
+        // When TFileType is other type, it is not necessary
         rangeDesc.setFileSize(fileStatus.size);
+        // In Backend, will append columnsFromPath to the end of row after data scanned from file.
         rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
         rangeDesc.setColumnsFromPath(columnsFromPath);
         rangeDesc.setHeaderType(headerType);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index c22b4842e2..ee9b1603bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -115,7 +115,7 @@ public class HiveScanNode extends BrokerScanNode {
 
         HiveTable hiveTable = (HiveTable) desc.getTable();
         fileGroups = Lists.newArrayList(
-                new BrokerFileGroup(hiveTable,
+                new BrokerFileGroup(hiveTable.getId(),
                         getColumnSeparator(),
                         getLineDelimiter(),
                         getPath(),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
new file mode 100644
index 0000000000..756441f396
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ImportColumnDesc;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.external.hudi.HudiProperty;
+import org.apache.doris.external.hudi.HudiTable;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TBrokerScanRangeParams;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.THdfsParams;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.mortbay.log.Log;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Hudi scan node to query hudi table.
+ */
+public class HudiScanNode extends BrokerScanNode {
+    private static final Logger LOG = LogManager.getLogger(HudiScanNode.class);
+
+    private HudiTable hudiTable;
+    // partition column predicates of hive table
+    private List<ExprNodeDesc> hivePredicates = new ArrayList<>();
+    private ExprNodeGenericFuncDesc hivePartitionPredicate;
+    private List<ImportColumnDesc> parsedColumnExprList = new ArrayList<>();
+    private String hdfsUri;
+
+    private Table remoteHiveTable;
+
+    /* hudi table properties */
+    private String fileFormat;
+    private String inputFormatName;
+    private String basePath;
+    private List<String> partitionKeys = new ArrayList<>();
+    /* hudi table properties */
+
+    private List<TScanRangeLocations> scanRangeLocations;
+
+    public HudiScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
+                        List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
+        super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded);
+        this.hudiTable = (HudiTable) destTupleDesc.getTable();
+    }
+
+    public String getHdfsUri() {
+        return hdfsUri;
+    }
+
+    public List<ImportColumnDesc> getParsedColumnExprList() {
+        return parsedColumnExprList;
+    }
+
+    public String getFileFormat() {
+        return fileFormat;
+    }
+
+    public String getBasePath() {
+        return basePath;
+    }
+
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    /**
+     *  super init will invoke initFileGroup, In initFileGroup will do
+     *  1, get hudi table from hive metastore
+     *  2, resolve hudi table type, query mode, table base path, partition columns information.
+     *  3. generate fileGroup
+     *
+     * @param analyzer analyzer
+     * @throws UserException when init failed.
+     */
+    @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+        // init scan range params
+        initParams(analyzer);
+    }
+
+    @Override
+    public int getNumInstances() {
+        return scanRangeLocations.size();
+    }
+
+    @Override
+    protected void initFileGroup() throws UserException {
+        resolvHiveTable();
+        analyzeColumnFromPath();
+
+        HudiTable hudiTable = (HudiTable) desc.getTable();
+        fileGroups = Lists.newArrayList(
+                new BrokerFileGroup(hudiTable.getId(),
+                        "\t",
+                        "\n",
+                        getBasePath(),
+                        getFileFormat(),
+                        getPartitionKeys(),
+                        getParsedColumnExprList()));
+        brokerDesc = new BrokerDesc("HudiTableDesc", StorageBackend.StorageType.HDFS, hudiTable.getTableProperties());
+
+    }
+
+    /**
+     * Override this function just for skip parent's getFileStatus.
+     */
+    @Override
+    protected void getFileStatus() throws DdlException {
+        if (partitionKeys.size() > 0) {
+            extractHivePartitionPredicate();
+        }
+        // set fileStatusesList as empty, we do not need fileStatusesList
+        fileStatusesList = Lists.newArrayList();
+        filesAdded = 0;
+    }
+
+    @Override
+    public void finalize(Analyzer analyzer) throws UserException {
+        try {
+            ParamCreateContext context = getParamCreateContexts().get(0);
+            finalizeParams(context.slotDescByName, context.exprMap, context.params,
+                    context.srcTupleDescriptor, false, context.fileGroup.isNegative(), analyzer);
+        } catch (AnalysisException e) {
+            throw new UserException(e.getMessage());
+        }
+        try {
+            buildScanRange();
+        } catch (IOException e) {
+            LOG.error("Build scan range failed.", e);
+            throw new UserException("Build scan range failed.", e);
+        }
+    }
+
+    @Override
+    public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
+        return scanRangeLocations;
+    }
+
+    private void resolvHiveTable() throws DdlException {
+        this.remoteHiveTable = HiveMetaStoreClientHelper.getTable(
+                hudiTable.getHmsDatabaseName(),
+                hudiTable.getHmsTableName(),
+                hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS));
+
+        this.inputFormatName = remoteHiveTable.getSd().getInputFormat();
+        this.fileFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(this.inputFormatName);
+        this.basePath = remoteHiveTable.getSd().getLocation();
+        for (FieldSchema fieldSchema : remoteHiveTable.getPartitionKeys()) {
+            this.partitionKeys.add(fieldSchema.getName());
+        }
+        Log.info("Hudi inputFileFormat is " + inputFormatName + ", basePath is " + this.basePath);
+    }
+
+    private void initParams(Analyzer analyzer) {
+        ParamCreateContext context = getParamCreateContexts().get(0);
+        TBrokerScanRangeParams params = context.params;
+
+        Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
+
+        List<Column> columns = hudiTable.getBaseSchema(false);
+        // init slot desc add expr map, also transform hadoop functions
+        for (Column column : columns) {
+            SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor);
+            slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+            slotDesc.setIsMaterialized(true);
+            slotDesc.setIsNullable(true);
+            slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
+            params.addToSrcSlotIds(slotDesc.getId().asInt());
+            slotDescByName.put(column.getName(), slotDesc);
+        }
+        context.slotDescByName = slotDescByName;
+    }
+
+
+    /**
+     * Extracts partition predicate from SelectStmt.whereClause that can be pushed down to Hive.
+     */
+    private void extractHivePartitionPredicate() throws DdlException {
+        ListIterator<Expr> it = conjuncts.listIterator();
+        while (it.hasNext()) {
+            ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr(
+                    it.next(), partitionKeys, hudiTable.getName());
+            if (hiveExpr != null) {
+                hivePredicates.add(hiveExpr);
+            }
+        }
+        int count = hivePredicates.size();
+        // combine all predicate by `and`
+        // compoundExprs must have at least 2 predicates
+        if (count >= 2) {
+            hivePartitionPredicate = HiveMetaStoreClientHelper.getCompoundExpr(hivePredicates, "and");
+        } else if (count == 1) {
+            // only one predicate
+            hivePartitionPredicate = (ExprNodeGenericFuncDesc) hivePredicates.get(0);
+        } else {
+            // have no predicate, make a dummy predicate "1=1" to get all partitions
+            HiveMetaStoreClientHelper.ExprBuilder exprBuilder =
+                    new HiveMetaStoreClientHelper.ExprBuilder(hudiTable.getName());
+            hivePartitionPredicate = exprBuilder.val(TypeInfoFactory.intTypeInfo, 1)
+                    .val(TypeInfoFactory.intTypeInfo, 1)
+                    .pred("=", 2).build();
+        }
+    }
+
+    private InputSplit[] getSplits() throws UserException, IOException {
+        String splitsPath = basePath;
+        if (partitionKeys.size() > 0) {
+            extractHivePartitionPredicate();
+
+            String metaStoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
+            List<Partition> hivePartitions =
+                    HiveMetaStoreClientHelper.getHivePartitions(metaStoreUris, remoteHiveTable, hivePartitionPredicate);
+            splitsPath = hivePartitions.stream()
+                    .map(x -> x.getSd().getLocation()).collect(Collectors.joining(","));
+        }
+
+
+        Configuration configuration = new Configuration();
+        InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false);
+        // alway get fileSplits from inputformat,
+        // because all hoodie input format have UseFileSplitsFromInputFormat annotation
+        JobConf jobConf = new JobConf(configuration);
+        FileInputFormat.setInputPaths(jobConf, splitsPath);
+        InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0);
+        return inputSplits;
+
+    }
+
+    // If fileFormat is not null, we use fileFormat instead of check file's suffix
+    protected void buildScanRange() throws UserException, IOException {
+        scanRangeLocations = Lists.newArrayList();
+        InputSplit[] inputSplits = getSplits();
+        if (inputSplits.length == 0) {
+            return;
+        }
+
+        THdfsParams hdfsParams = new THdfsParams();
+        String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
+        String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
+        String fsName = fullPath.replace(filePath, "");
+        hdfsParams.setFsName(fsName);
+        Log.debug("Hudi path's host is " + fsName);
+
+        TFileFormatType formatType = null;
+        if (this.inputFormatName.toUpperCase(Locale.ROOT).contains("parquet")) {
+            formatType = TFileFormatType.FORMAT_PARQUET;
+        } else if (this.inputFormatName.toUpperCase(Locale.ROOT).contains("orc")) {
+            formatType = TFileFormatType.FORMAT_ORC;
+        } else {
+            throw new UserException("unsupported hudi table type [" + this.inputFormatName + "].");
+        }
+
+        ParamCreateContext context = getParamCreateContexts().get(0);
+        for (InputSplit split : inputSplits) {
+            FileSplit fileSplit = (FileSplit) split;
+
+            TScanRangeLocations curLocations = newLocations(context.params, brokerDesc);
+            List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
+                    getPartitionKeys());
+            int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size();
+
+            TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, formatType,
+                    partitionValuesFromPath, numberOfColumnsFromFile, brokerDesc);
+            rangeDesc.setHdfsParams(hdfsParams);
+            rangeDesc.setReadByColumnDef(true);
+
+            curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
+            Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId()
+                    + " with hudi split: " +  fileSplit.getPath()
+                    + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")");
+
+            // Put the last file
+            if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
+                scanRangeLocations.add(curLocations);
+            }
+        }
+    }
+
+    private TBrokerRangeDesc createBrokerRangeDesc(FileSplit fileSplit,
+                                                   TFileFormatType formatType,
+                                                   List<String> columnsFromPath, int numberOfColumnsFromFile,
+                                                   BrokerDesc brokerDesc) {
+        TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
+        rangeDesc.setFileType(brokerDesc.getFileType());
+        rangeDesc.setFormatType(formatType);
+        rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
+        rangeDesc.setSplittable(true);
+        rangeDesc.setStartOffset(fileSplit.getStart());
+        rangeDesc.setSize(fileSplit.getLength());
+        rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
+        rangeDesc.setColumnsFromPath(columnsFromPath);
+        // set hdfs params for hdfs file type.
+        switch (brokerDesc.getFileType()) {
+            case FILE_HDFS:
+                BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc);
+                break;
+            default:
+                break;
+        }
+        return rangeDesc;
+    }
+
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
+        StringBuilder output = new StringBuilder();
+        if (!isLoad()) {
+            output.append(prefix).append("TABLE: ").append(hudiTable.getName()).append("\n");
+            output.append(prefix).append("PATH: ")
+                    .append(hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS)).append("\n");
+        }
+        return output.toString();
+    }
+
+    /**
+     * Analyze columns from path, the partition columns.
+     */
+    private void analyzeColumnFromPath() {
+        for (String colName : partitionKeys) {
+            ImportColumnDesc importColumnDesc = new ImportColumnDesc(colName, null);
+            parsedColumnExprList.add(importColumnDesc);
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
index 6bdda5272e..4af73caf51 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
@@ -57,7 +57,10 @@ public class IcebergScanNode extends BrokerScanNode {
 
     @Override
     protected void initFileGroup() throws UserException {
-        fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable));
+        fileGroups = Lists.newArrayList(
+            new BrokerFileGroup(icebergTable.getId(),
+                null,
+                icebergTable.getFileFormat()));
         brokerDesc = new BrokerDesc("IcebergTableDesc", icebergTable.getStorageType(),
                 icebergTable.getIcebergProperties());
         targetTable = icebergTable;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index b6aec38c8e..acf6702f70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1708,6 +1708,10 @@ public class SingleNodePlanner {
                 scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode",
                         null, -1);
                 break;
+            case HUDI:
+                scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode",
+                        null, -1);
+                break;
             default:
                 break;
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
index 83c85251ae..1400d8cdd7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
@@ -291,4 +291,27 @@ public class CreateTableStmtTest {
                 + "\"hudi.hive.metastore.uris\"  =  \"thrift://127.0.0.1:9087\",\n"
                 + "\"hudi.table\"  =  \"test\")", stmt.toString());
     }
+
+    @Test
+    public void testCreateHudiTableWithSchema() throws UserException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("hudi.database", "doris");
+        properties.put("hudi.table", "test");
+        properties.put("hudi.hive.metastore.uris", "thrift://127.0.0.1:9087");
+        CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, "");
+        ColumnDef idCol = new ColumnDef("id", TypeDef.create(PrimitiveType.INT));
+        stmt.addColumnDef(idCol);
+        ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.INT), false,
+                null, true, ColumnDef.DefaultValue.NOT_SET, "");
+        stmt.addColumnDef(nameCol);
+        stmt.analyze(analyzer);
+
+        Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n"
+                + "  `id` int(11) NOT NULL COMMENT \"\",\n"
+                + "  `name` int(11) NULL COMMENT \"\"\n"
+                + ") ENGINE = hudi\n"
+                + "PROPERTIES (\"hudi.database\"  =  \"doris\",\n"
+                + "\"hudi.hive.metastore.uris\"  =  \"thrift://127.0.0.1:9087\",\n"
+                + "\"hudi.table\"  =  \"test\")", stmt.toString());
+    }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index 40406208cf..7c1045346e 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -323,6 +323,12 @@ under the License.
     </profiles>
     <dependencyManagement>
         <dependencies>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-client</artifactId>
+                <version>2.8.0</version>
+                <scope>compile</scope>
+            </dependency>
             <dependency>
                 <groupId>${project.groupId}</groupId>
                 <artifactId>fe-common</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org