You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/20 09:46:30 UTC

[doris] branch master updated: [feature-wip](multi-catalog) External file scan node (#9973)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a8d24b4a6 [feature-wip](multi-catalog) External file scan node (#9973)
8a8d24b4a6 is described below

commit 8a8d24b4a6bd0aad2eeb612aff2c32aa8724b1de
Author: huangzhaowei <hu...@bytedance.com>
AuthorDate: Mon Jun 20 17:46:24 2022 +0800

    [feature-wip](multi-catalog) External file scan node (#9973)
---
 .../org/apache/doris/analysis/DescriptorTable.java |   2 +-
 .../org/apache/doris/analysis/SlotDescriptor.java  |   7 +-
 .../java/org/apache/doris/analysis/SlotRef.java    |   5 +-
 .../java/org/apache/doris/analysis/TableRef.java   |   2 +-
 .../org/apache/doris/analysis/TupleDescriptor.java |   9 +-
 .../org/apache/doris/catalog/IcebergTable.java     |   1 -
 .../doris/catalog/external/HMSExternalTable.java   |   7 +
 .../org/apache/doris/planner/HashJoinNode.java     |   4 +-
 .../planner/external/ExternalFileScanNode.java     | 407 +++++++++++++++++++++
 .../planner/external/ExternalFileScanProvider.java |  49 +++
 .../planner/external/ExternalHiveScanProvider.java | 149 ++++++++
 .../planner/external/ExternalHudiScanProvider.java |  38 ++
 .../external/ExternalIcebergScanProvider.java      | 105 ++++++
 13 files changed, 768 insertions(+), 17 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 00139913d9..ed7ba00dd6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -161,7 +161,7 @@ public class DescriptorTable {
                 // but its table has no id
                 if (tupleD.getTable() != null
                         && tupleD.getTable().getId() >= 0) {
-                    referencedTbls.add(tupleD.getTable());
+                    referencedTbls.add((Table) tupleD.getTable());
                 }
                 for (SlotDescriptor slotD : tupleD.getMaterializedSlots()) {
                     result.addToSlotDescriptors(slotD.toThrift());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 9cdd0b23b7..1eb41ae6cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -329,10 +329,7 @@ public class SlotDescriptor {
     }
 
     public boolean isScanSlot() {
-        Table table = parent.getTable();
-        if ((table != null) && (table instanceof OlapTable)) {
-            return true;
-        }
-        return false;
+        Table table = (Table) parent.getTable();
+        return table instanceof OlapTable;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
index 6e8017e3bd..706d82e79c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
@@ -373,7 +373,7 @@ public class SlotRef extends Expr {
                 expr.getTableIdToColumnNames(tableIdToColumnNames);
             }
         } else {
-            Table table = desc.getParent().getTable();
+            Table table = (Table) desc.getParent().getTable();
             if (table == null) {
                 // Maybe this column comes from inline view.
                 return;
@@ -390,8 +390,7 @@ public class SlotRef extends Expr {
 
     public Table getTable() {
         Preconditions.checkState(desc != null);
-        Table table = desc.getParent().getTable();
-        return table;
+        return (Table) desc.getParent().getTable();
     }
 
     public void setLabel(String label) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 1d15ea6fe4..107a9a3637 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -301,7 +301,7 @@ public class TableRef implements ParseNode, Writable {
     }
 
     public Table getTable() {
-        return desc.getTable();
+        return (Table) desc.getTable();
     }
 
     public void setUsingClause(List<String> colNames) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
index 5bfc261713..96aaa615cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
@@ -23,6 +23,7 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.ColumnStats;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.thrift.TTupleDescriptor;
 
 import com.google.common.base.Joiner;
@@ -45,7 +46,7 @@ public class TupleDescriptor {
     private final ArrayList<SlotDescriptor> slots;
 
     // underlying table, if there is one
-    private Table table;
+    private TableIf table;
     // underlying table, if there is one
     private TableRef ref;
 
@@ -151,11 +152,11 @@ public class TupleDescriptor {
         return null;
     }
 
-    public Table getTable() {
+    public TableIf getTable() {
         return table;
     }
 
-    public void setTable(Table tbl) {
+    public void setTable(TableIf tbl) {
         table = tbl;
     }
 
@@ -352,7 +353,7 @@ public class TupleDescriptor {
             if (slotDescriptor.getColumn() != null) {
                 TupleDescriptor parent = slotDescriptor.getParent();
                 Preconditions.checkState(parent != null);
-                Table table = parent.getTable();
+                Table table = (Table) parent.getTable();
                 Preconditions.checkState(table != null);
                 Long tableId = table.getId();
                 Set<String> columnNames = tableIdToColumnNames.get(tableId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
index a575ab110b..eb748ed807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
@@ -171,7 +171,6 @@ public class IcebergTable extends Table {
         return fileFormat;
     }
 
-    // get the iceberg table instance, if table is not loaded, load it.
     private org.apache.iceberg.Table getTable() throws Exception {
         if (isLoaded.get()) {
             Preconditions.checkNotNull(icebergTable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 8c5ea18d06..c4167b89df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -131,4 +131,11 @@ public class HMSExternalTable extends ExternalTable {
         }
         return null;
     }
+
+    /**
+     * get database name of hms table.
+     */
+    public String getDbName() {
+        return dbName;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 38a9e88e3c..9192c0981f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -363,13 +363,13 @@ public class HashJoinNode extends PlanNode {
         }
 
         public double lhsNumRows() {
-            Table table = lhs.getParent().getTable();
+            Table table = (Table) lhs.getParent().getTable();
             Preconditions.checkState(table instanceof OlapTable);
             return ((OlapTable) (table)).getRowCount();
         }
 
         public double rhsNumRows() {
-            Table table = rhs.getParent().getTable();
+            Table table = (Table) rhs.getParent().getTable();
             Preconditions.checkState(table instanceof OlapTable);
             return ((OlapTable) (table)).getRowCount();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
new file mode 100644
index 0000000000..8be039b30f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.mysql.privilege.UserProperty;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TBrokerScanNode;
+import org.apache.doris.thrift.TBrokerScanRange;
+import org.apache.doris.thrift.TBrokerScanRangeParams;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.THdfsParams;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.mortbay.log.Log;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * ExternalFileScanNode for the file access type of datasource, now only support hive,hudi and iceberg.
+ */
+public class ExternalFileScanNode extends ExternalScanNode {
+    private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class);
+
+    private static final String HIVE_DEFAULT_COLUMN_SEPARATOR = "\001";
+
+    private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n";
+
+    private static class ParamCreateContext {
+        public TBrokerScanRangeParams params;
+        public TupleDescriptor srcTupleDescriptor;
+        public Map<String, SlotDescriptor> slotDescByName;
+    }
+
+    private static class BackendPolicy {
+        private final List<Backend> backends = Lists.newArrayList();
+
+        private int nextBe = 0;
+
+        public void init() throws UserException {
+            Set<Tag> tags = Sets.newHashSet();
+            if (ConnectContext.get().getCurrentUserIdentity() != null) {
+                String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
+                tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(qualifiedUser);
+                if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+                    throw new UserException("No valid resource tag for user: " + qualifiedUser);
+                }
+            } else {
+                LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer");
+            }
+
+            // scan node is used for query
+            BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
+                    .needQueryAvailable()
+                    .needLoadAvailable()
+                    .addTags(tags)
+                    .build();
+            for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) {
+                if (policy.isMatch(be)) {
+                    backends.add(be);
+                }
+            }
+            if (backends.isEmpty()) {
+                throw new UserException("No available backends");
+            }
+            Random random = new Random(System.currentTimeMillis());
+            Collections.shuffle(backends, random);
+        }
+
+        public Backend getNextBe() {
+            Backend selectedBackend = backends.get(nextBe++);
+            nextBe = nextBe % backends.size();
+            return selectedBackend;
+        }
+    }
+
+    private enum DLAType {
+        HIVE,
+        HUDI,
+        ICE_BERG
+    }
+
+    private final BackendPolicy backendPolicy = new BackendPolicy();
+
+    private final ParamCreateContext context = new ParamCreateContext();
+
+    private List<TScanRangeLocations> scanRangeLocations;
+
+    private final HMSExternalTable hmsTable;
+
+    private ExternalFileScanProvider scanProvider;
+
+    /**
+     * External file scan node for hms table.
+     */
+    public ExternalFileScanNode(
+            PlanNodeId id,
+            TupleDescriptor desc,
+            String planNodeName) throws MetaNotFoundException {
+        super(id, desc, planNodeName, NodeType.BROKER_SCAN_NODE);
+
+        this.hmsTable = (HMSExternalTable) desc.getTable();
+
+        DLAType type = getDLAType();
+        switch (type) {
+            case HUDI:
+                this.scanProvider = new ExternalHudiScanProvider(this.hmsTable);
+                break;
+            case ICE_BERG:
+                this.scanProvider = new ExternalIcebergScanProvider(this.hmsTable);
+                break;
+            case HIVE:
+                this.scanProvider = new ExternalHiveScanProvider(this.hmsTable);
+                break;
+            default:
+                LOG.warn("Unknown table for dla.");
+        }
+    }
+
+    private DLAType getDLAType() throws MetaNotFoundException {
+        if (hmsTable.getRemoteTable().getParameters().containsKey("table_type")
+                && hmsTable.getRemoteTable().getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) {
+            return DLAType.ICE_BERG;
+        } else if (hmsTable.getRemoteTable().getSd().getInputFormat().toLowerCase().contains("hoodie")) {
+            return DLAType.HUDI;
+        } else {
+            return DLAType.HIVE;
+        }
+    }
+
+    @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+        backendPolicy.init();
+        initContext(context);
+    }
+
+    private void initContext(ParamCreateContext context) throws DdlException, MetaNotFoundException {
+        context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor();
+        context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        context.params = new TBrokerScanRangeParams();
+        if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) {
+            Map<String, String> serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
+            String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim"))
+                    ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim");
+            String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim"))
+                    ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim");
+            context.params.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]);
+            context.params.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]);
+            context.params.setColumnSeparatorStr(columnSeparator);
+            context.params.setLineDelimiterStr(lineDelimiter);
+            context.params.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length);
+            context.params.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length);
+        }
+
+        Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
+
+        List<Column> columns = hmsTable.getBaseSchema(false);
+        for (Column column : columns) {
+            SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor);
+            slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+            slotDesc.setIsMaterialized(true);
+            slotDesc.setIsNullable(true);
+            slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
+            context.params.addToSrcSlotIds(slotDesc.getId().asInt());
+            slotDescByName.put(column.getName(), slotDesc);
+        }
+        context.slotDescByName = slotDescByName;
+    }
+
+    @Override
+    public void finalize(Analyzer analyzer) throws UserException {
+        try {
+            finalizeParams(context.slotDescByName, context.params, context.srcTupleDescriptor);
+            buildScanRange();
+        } catch (IOException e) {
+            LOG.error("Finalize failed.", e);
+            throw new UserException("Finalize failed.", e);
+        }
+    }
+
+    // If fileFormat is not null, we use fileFormat instead of check file's suffix
+    private void buildScanRange() throws UserException, IOException {
+        scanRangeLocations = Lists.newArrayList();
+        InputSplit[] inputSplits = scanProvider.getSplits(conjuncts);
+        if (0 == inputSplits.length) {
+            return;
+        }
+
+        THdfsParams hdfsParams = new THdfsParams();
+        String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
+        String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
+        String fsName = fullPath.replace(filePath, "");
+        hdfsParams.setFsName(fsName);
+        List<String> partitionKeys = new ArrayList<>();
+        for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) {
+            partitionKeys.add(fieldSchema.getName());
+        }
+
+        for (InputSplit split : inputSplits) {
+            FileSplit fileSplit = (FileSplit) split;
+
+            TScanRangeLocations curLocations = newLocations(context.params);
+            List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
+                    partitionKeys);
+            int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size();
+
+            TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, partitionValuesFromPath,
+                    numberOfColumnsFromFile);
+            rangeDesc.setHdfsParams(hdfsParams);
+            rangeDesc.setReadByColumnDef(true);
+
+            curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
+            Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId()
+                    + " with table split: " +  fileSplit.getPath()
+                    + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")");
+
+            // Put the last file
+            if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
+                scanRangeLocations.add(curLocations);
+            }
+        }
+    }
+
+    private TScanRangeLocations newLocations(TBrokerScanRangeParams params) {
+        // Generate on broker scan range
+        TBrokerScanRange brokerScanRange = new TBrokerScanRange();
+        brokerScanRange.setParams(params);
+        brokerScanRange.setBrokerAddresses(new ArrayList<>());
+
+        // Scan range
+        TScanRange scanRange = new TScanRange();
+        scanRange.setBrokerScanRange(brokerScanRange);
+
+        // Locations
+        TScanRangeLocations locations = new TScanRangeLocations();
+        locations.setScanRange(scanRange);
+
+        TScanRangeLocation location = new TScanRangeLocation();
+        Backend selectedBackend = backendPolicy.getNextBe();
+        location.setBackendId(selectedBackend.getId());
+        location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
+        locations.addToLocations(location);
+
+        return locations;
+    }
+
+    private TBrokerRangeDesc createBrokerRangeDesc(
+            FileSplit fileSplit,
+            List<String> columnsFromPath,
+            int numberOfColumnsFromFile) throws DdlException, MetaNotFoundException {
+        TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
+        rangeDesc.setFileType(scanProvider.getTableFileType());
+        rangeDesc.setFormatType(scanProvider.getTableFormatType());
+        rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
+        rangeDesc.setSplittable(true);
+        rangeDesc.setStartOffset(fileSplit.getStart());
+        rangeDesc.setSize(fileSplit.getLength());
+        rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
+        rangeDesc.setColumnsFromPath(columnsFromPath);
+        // set hdfs params for hdfs file type.
+        if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
+            BrokerUtil.generateHdfsParam(scanProvider.getTableProperties(), rangeDesc);
+        }
+        return rangeDesc;
+    }
+
+    private void finalizeParams(
+            Map<String, SlotDescriptor> slotDescByName,
+            TBrokerScanRangeParams params,
+            TupleDescriptor srcTupleDesc) throws UserException {
+        Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
+        for (SlotDescriptor destSlotDesc : desc.getSlots()) {
+            Expr expr;
+            SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName());
+            if (srcSlotDesc != null) {
+                destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
+                // If dest is allow null, we set source to nullable
+                if (destSlotDesc.getColumn().isAllowNull()) {
+                    srcSlotDesc.setIsNullable(true);
+                }
+                expr = new SlotRef(srcSlotDesc);
+            } else {
+                Column column = destSlotDesc.getColumn();
+                if (column.getDefaultValue() != null) {
+                    expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue());
+                } else {
+                    if (column.isAllowNull()) {
+                        expr = NullLiteral.create(column.getType());
+                    } else {
+                        throw new AnalysisException("column has no source field, column=" + column.getName());
+                    }
+                }
+            }
+
+            expr = castToSlot(destSlotDesc, expr);
+            params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
+        }
+        params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
+        params.setDestTupleId(desc.getId().asInt());
+        params.setStrictMode(false);
+        params.setSrcTupleId(srcTupleDesc.getId().asInt());
+
+        // Need re compute memory layout after set some slot descriptor to nullable
+        srcTupleDesc.computeStatAndMemLayout();
+    }
+
+    @Override
+    public int getNumInstances() {
+        return scanRangeLocations.size();
+    }
+
+    @Override
+    protected void toThrift(TPlanNode planNode) {
+        planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE);
+        TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
+        if (!preFilterConjuncts.isEmpty()) {
+            if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
+                brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
+            } else {
+                for (Expr e : preFilterConjuncts) {
+                    brokerScanNode.addToPreFilterExprs(e.treeToThrift());
+                }
+            }
+        }
+        planNode.setBrokerScanNode(brokerScanNode);
+    }
+
+    @Override
+    public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
+        return scanRangeLocations;
+    }
+
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
+        String url;
+        try {
+            url = scanProvider.getMetaStoreUrl();
+        } catch (MetaNotFoundException e) {
+            LOG.warn("Can't get url error", e);
+            url = "Can't get url error.";
+        }
+        return prefix + "DATABASE: " + hmsTable.getDbName() + "\n"
+                + prefix + "TABLE: " + hmsTable.getName() + "\n"
+                + prefix + "HIVE URL: " + url + "\n";
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
new file mode 100644
index 0000000000..2fc626ab70
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An interface for file scan node to get the need information.
+ */
+public interface ExternalFileScanProvider {
+    TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException;
+
+    TFileType getTableFileType();
+
+    String getMetaStoreUrl() throws MetaNotFoundException;
+
+    InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException;
+
+    Table getRemoteHiveTable() throws DdlException, MetaNotFoundException;
+
+    Map<String, String> getTableProperties() throws MetaNotFoundException;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
new file mode 100644
index 0000000000..979a5a29d3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A HiveScanProvider to get information for scan node.
+ */
+public class ExternalHiveScanProvider implements ExternalFileScanProvider {
+    protected HMSExternalTable hmsTable;
+
+    public ExternalHiveScanProvider(HMSExternalTable hmsTable) {
+        this.hmsTable = hmsTable;
+    }
+
+    @Override
+    public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException {
+        TFileFormatType type = null;
+        String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
+        String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
+        if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) {
+            type = TFileFormatType.FORMAT_PARQUET;
+        } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
+            type = TFileFormatType.FORMAT_ORC;
+        } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) {
+            type = TFileFormatType.FORMAT_CSV_PLAIN;
+        }
+        return type;
+    }
+
+    @Override
+    public TFileType getTableFileType() {
+        return TFileType.FILE_HDFS;
+    }
+
+    @Override
+    public String getMetaStoreUrl() throws MetaNotFoundException {
+        return getTableProperties().get(HiveConf.ConfVars.METASTOREURIS.name());
+    }
+
+    @Override
+    public InputSplit[] getSplits(List<Expr> exprs)
+            throws IOException, UserException {
+        String splitsPath = getRemoteHiveTable().getSd().getLocation();
+        List<String> partitionKeys = getRemoteHiveTable().getPartitionKeys()
+                .stream().map(FieldSchema::getName).collect(Collectors.toList());
+
+        if (partitionKeys.size() > 0) {
+            ExprNodeGenericFuncDesc hivePartitionPredicate = extractHivePartitionPredicate(exprs, partitionKeys);
+
+            String metaStoreUris = getMetaStoreUrl();
+            List<Partition> hivePartitions = HiveMetaStoreClientHelper.getHivePartitions(
+                    metaStoreUris,  getRemoteHiveTable(), hivePartitionPredicate);
+            if (!hivePartitions.isEmpty()) {
+                splitsPath = hivePartitions.stream().map(x -> x.getSd().getLocation())
+                        .collect(Collectors.joining(","));
+            }
+        }
+
+        String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
+
+        Configuration configuration = new Configuration();
+        InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false);
+        JobConf jobConf = new JobConf(configuration);
+        FileInputFormat.setInputPaths(jobConf, splitsPath);
+        return inputFormat.getSplits(jobConf, 0);
+    }
+
+
+    private ExprNodeGenericFuncDesc extractHivePartitionPredicate(List<Expr> conjuncts, List<String> partitionKeys)
+            throws DdlException {
+        ExprNodeGenericFuncDesc hivePartitionPredicate;
+        List<ExprNodeDesc> exprNodeDescs = new ArrayList<>();
+        for (Expr conjunct : conjuncts) {
+            ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr(
+                    conjunct, partitionKeys, hmsTable.getName());
+            if (hiveExpr != null) {
+                exprNodeDescs.add(hiveExpr);
+            }
+        }
+        int count = exprNodeDescs.size();
+
+        if (count >= 2) {
+            hivePartitionPredicate = HiveMetaStoreClientHelper.getCompoundExpr(exprNodeDescs, "and");
+        } else if (count == 1) {
+            hivePartitionPredicate = (ExprNodeGenericFuncDesc) exprNodeDescs.get(0);
+        } else {
+            HiveMetaStoreClientHelper.ExprBuilder exprBuilder =
+                    new HiveMetaStoreClientHelper.ExprBuilder(hmsTable.getName());
+            hivePartitionPredicate = exprBuilder.val(TypeInfoFactory.intTypeInfo, 1)
+                    .val(TypeInfoFactory.intTypeInfo, 1)
+                    .pred("=", 2).build();
+        }
+        return hivePartitionPredicate;
+    }
+
+    @Override
+    public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException {
+        return hmsTable.getRemoteTable();
+    }
+
+    @Override
+    public Map<String, String> getTableProperties() throws MetaNotFoundException {
+        return hmsTable.getRemoteTable().getParameters();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
new file mode 100644
index 0000000000..2951c1fb7a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.thrift.TFileFormatType;
+
+/**
+ * A file scan provider for hudi.
+ * HudiProvier is extended with hive since they both use input format interface to get the spilt.
+ */
+public class ExternalHudiScanProvider extends ExternalHiveScanProvider {
+
+    public ExternalHudiScanProvider(HMSExternalTable hmsTable) {
+        super(hmsTable);
+    }
+
+    @Override
+    public TFileFormatType getTableFormatType() throws DdlException {
+        return TFileFormatType.FORMAT_PARQUET;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
new file mode 100644
index 0000000000..28ba3dad0a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.IcebergProperty;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.external.iceberg.HiveCatalog;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A file scan provider for iceberg.
+ */
+public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
+
+    public ExternalIcebergScanProvider(HMSExternalTable hmsTable) {
+        super(hmsTable);
+    }
+
+    @Override
+    public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException {
+        TFileFormatType type;
+
+        String icebergFormat  = getRemoteHiveTable().getParameters()
+                .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+        if (icebergFormat.equals("parquet")) {
+            type = TFileFormatType.FORMAT_PARQUET;
+        } else if (icebergFormat.equals("orc")) {
+            type = TFileFormatType.FORMAT_ORC;
+        } else {
+            throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat));
+        }
+        return type;
+    }
+
+    @Override
+    public TFileType getTableFileType() {
+        return TFileType.FILE_HDFS;
+    }
+
+    @Override
+    public InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException {
+        List<Expression> expressions = new ArrayList<>();
+        for (Expr conjunct : exprs) {
+            Expression expression = IcebergUtils.convertToIcebergExpr(conjunct);
+            if (expression != null) {
+                expressions.add(expression);
+            }
+        }
+
+        org.apache.iceberg.Table table = getIcebergTable();
+        TableScan scan = table.newScan();
+        for (Expression predicate : expressions) {
+            scan = scan.filter(predicate);
+        }
+        List<FileSplit> splits = new ArrayList<>();
+
+        for (FileScanTask task : scan.planFiles()) {
+            for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
+                splits.add(new FileSplit(new Path(spitTask.file().path().toString()),
+                        spitTask.start(), spitTask.length(), new String[0]));
+            }
+        }
+        return splits.toArray(new InputSplit[0]);
+    }
+
+    private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
+        HiveCatalog hiveCatalog = new HiveCatalog();
+        hiveCatalog.initialize(new IcebergProperty(getTableProperties()));
+        return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName()));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org