You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/20 09:46:30 UTC
[doris] branch master updated: [feature-wip](multi-catalog) External file scan node (#9973)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8a8d24b4a6 [feature-wip](multi-catalog) External file scan node (#9973)
8a8d24b4a6 is described below
commit 8a8d24b4a6bd0aad2eeb612aff2c32aa8724b1de
Author: huangzhaowei <hu...@bytedance.com>
AuthorDate: Mon Jun 20 17:46:24 2022 +0800
[feature-wip](multi-catalog) External file scan node (#9973)
---
.../org/apache/doris/analysis/DescriptorTable.java | 2 +-
.../org/apache/doris/analysis/SlotDescriptor.java | 7 +-
.../java/org/apache/doris/analysis/SlotRef.java | 5 +-
.../java/org/apache/doris/analysis/TableRef.java | 2 +-
.../org/apache/doris/analysis/TupleDescriptor.java | 9 +-
.../org/apache/doris/catalog/IcebergTable.java | 1 -
.../doris/catalog/external/HMSExternalTable.java | 7 +
.../org/apache/doris/planner/HashJoinNode.java | 4 +-
.../planner/external/ExternalFileScanNode.java | 407 +++++++++++++++++++++
.../planner/external/ExternalFileScanProvider.java | 49 +++
.../planner/external/ExternalHiveScanProvider.java | 149 ++++++++
.../planner/external/ExternalHudiScanProvider.java | 38 ++
.../external/ExternalIcebergScanProvider.java | 105 ++++++
13 files changed, 768 insertions(+), 17 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 00139913d9..ed7ba00dd6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -161,7 +161,7 @@ public class DescriptorTable {
// but its table has no id
if (tupleD.getTable() != null
&& tupleD.getTable().getId() >= 0) {
- referencedTbls.add(tupleD.getTable());
+ referencedTbls.add((Table) tupleD.getTable());
}
for (SlotDescriptor slotD : tupleD.getMaterializedSlots()) {
result.addToSlotDescriptors(slotD.toThrift());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 9cdd0b23b7..1eb41ae6cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -329,10 +329,7 @@ public class SlotDescriptor {
}
public boolean isScanSlot() {
- Table table = parent.getTable();
- if ((table != null) && (table instanceof OlapTable)) {
- return true;
- }
- return false;
+ Table table = (Table) parent.getTable();
+ return table instanceof OlapTable;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
index 6e8017e3bd..706d82e79c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
@@ -373,7 +373,7 @@ public class SlotRef extends Expr {
expr.getTableIdToColumnNames(tableIdToColumnNames);
}
} else {
- Table table = desc.getParent().getTable();
+ Table table = (Table) desc.getParent().getTable();
if (table == null) {
// Maybe this column comes from inline view.
return;
@@ -390,8 +390,7 @@ public class SlotRef extends Expr {
public Table getTable() {
Preconditions.checkState(desc != null);
- Table table = desc.getParent().getTable();
- return table;
+ return (Table) desc.getParent().getTable();
}
public void setLabel(String label) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 1d15ea6fe4..107a9a3637 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -301,7 +301,7 @@ public class TableRef implements ParseNode, Writable {
}
public Table getTable() {
- return desc.getTable();
+ return (Table) desc.getTable();
}
public void setUsingClause(List<String> colNames) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
index 5bfc261713..96aaa615cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java
@@ -23,6 +23,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.ColumnStats;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.thrift.TTupleDescriptor;
import com.google.common.base.Joiner;
@@ -45,7 +46,7 @@ public class TupleDescriptor {
private final ArrayList<SlotDescriptor> slots;
// underlying table, if there is one
- private Table table;
+ private TableIf table;
// underlying table, if there is one
private TableRef ref;
@@ -151,11 +152,11 @@ public class TupleDescriptor {
return null;
}
- public Table getTable() {
+ public TableIf getTable() {
return table;
}
- public void setTable(Table tbl) {
+ public void setTable(TableIf tbl) {
table = tbl;
}
@@ -352,7 +353,7 @@ public class TupleDescriptor {
if (slotDescriptor.getColumn() != null) {
TupleDescriptor parent = slotDescriptor.getParent();
Preconditions.checkState(parent != null);
- Table table = parent.getTable();
+ Table table = (Table) parent.getTable();
Preconditions.checkState(table != null);
Long tableId = table.getId();
Set<String> columnNames = tableIdToColumnNames.get(tableId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
index a575ab110b..eb748ed807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
@@ -171,7 +171,6 @@ public class IcebergTable extends Table {
return fileFormat;
}
- // get the iceberg table instance, if table is not loaded, load it.
private org.apache.iceberg.Table getTable() throws Exception {
if (isLoaded.get()) {
Preconditions.checkNotNull(icebergTable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 8c5ea18d06..c4167b89df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -131,4 +131,11 @@ public class HMSExternalTable extends ExternalTable {
}
return null;
}
+
+ /**
+ * get database name of hms table.
+ */
+ public String getDbName() {
+ return dbName;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 38a9e88e3c..9192c0981f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -363,13 +363,13 @@ public class HashJoinNode extends PlanNode {
}
public double lhsNumRows() {
- Table table = lhs.getParent().getTable();
+ Table table = (Table) lhs.getParent().getTable();
Preconditions.checkState(table instanceof OlapTable);
return ((OlapTable) (table)).getRowCount();
}
public double rhsNumRows() {
- Table table = rhs.getParent().getTable();
+ Table table = (Table) rhs.getParent().getTable();
Preconditions.checkState(table instanceof OlapTable);
return ((OlapTable) (table)).getRowCount();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
new file mode 100644
index 0000000000..8be039b30f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.mysql.privilege.UserProperty;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TBrokerScanNode;
+import org.apache.doris.thrift.TBrokerScanRange;
+import org.apache.doris.thrift.TBrokerScanRangeParams;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.THdfsParams;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.mortbay.log.Log;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * ExternalFileScanNode for the file access type of datasource, now only support hive,hudi and iceberg.
+ */
+public class ExternalFileScanNode extends ExternalScanNode {
+ private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class);
+
+ private static final String HIVE_DEFAULT_COLUMN_SEPARATOR = "\001";
+
+ private static final String HIVE_DEFAULT_LINE_DELIMITER = "\n";
+
+ private static class ParamCreateContext {
+ public TBrokerScanRangeParams params;
+ public TupleDescriptor srcTupleDescriptor;
+ public Map<String, SlotDescriptor> slotDescByName;
+ }
+
+ private static class BackendPolicy {
+ private final List<Backend> backends = Lists.newArrayList();
+
+ private int nextBe = 0;
+
+ public void init() throws UserException {
+ Set<Tag> tags = Sets.newHashSet();
+ if (ConnectContext.get().getCurrentUserIdentity() != null) {
+ String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
+ tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(qualifiedUser);
+ if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+ throw new UserException("No valid resource tag for user: " + qualifiedUser);
+ }
+ } else {
+ LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer");
+ }
+
+ // scan node is used for query
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
+ .needQueryAvailable()
+ .needLoadAvailable()
+ .addTags(tags)
+ .build();
+ for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) {
+ if (policy.isMatch(be)) {
+ backends.add(be);
+ }
+ }
+ if (backends.isEmpty()) {
+ throw new UserException("No available backends");
+ }
+ Random random = new Random(System.currentTimeMillis());
+ Collections.shuffle(backends, random);
+ }
+
+ public Backend getNextBe() {
+ Backend selectedBackend = backends.get(nextBe++);
+ nextBe = nextBe % backends.size();
+ return selectedBackend;
+ }
+ }
+
+ private enum DLAType {
+ HIVE,
+ HUDI,
+ ICE_BERG
+ }
+
+ private final BackendPolicy backendPolicy = new BackendPolicy();
+
+ private final ParamCreateContext context = new ParamCreateContext();
+
+ private List<TScanRangeLocations> scanRangeLocations;
+
+ private final HMSExternalTable hmsTable;
+
+ private ExternalFileScanProvider scanProvider;
+
+ /**
+ * External file scan node for hms table.
+ */
+ public ExternalFileScanNode(
+ PlanNodeId id,
+ TupleDescriptor desc,
+ String planNodeName) throws MetaNotFoundException {
+ super(id, desc, planNodeName, NodeType.BROKER_SCAN_NODE);
+
+ this.hmsTable = (HMSExternalTable) desc.getTable();
+
+ DLAType type = getDLAType();
+ switch (type) {
+ case HUDI:
+ this.scanProvider = new ExternalHudiScanProvider(this.hmsTable);
+ break;
+ case ICE_BERG:
+ this.scanProvider = new ExternalIcebergScanProvider(this.hmsTable);
+ break;
+ case HIVE:
+ this.scanProvider = new ExternalHiveScanProvider(this.hmsTable);
+ break;
+ default:
+ LOG.warn("Unknown table for dla.");
+ }
+ }
+
+ private DLAType getDLAType() throws MetaNotFoundException {
+ if (hmsTable.getRemoteTable().getParameters().containsKey("table_type")
+ && hmsTable.getRemoteTable().getParameters().get("table_type").equalsIgnoreCase("ICEBERG")) {
+ return DLAType.ICE_BERG;
+ } else if (hmsTable.getRemoteTable().getSd().getInputFormat().toLowerCase().contains("hoodie")) {
+ return DLAType.HUDI;
+ } else {
+ return DLAType.HIVE;
+ }
+ }
+
+ @Override
+ public void init(Analyzer analyzer) throws UserException {
+ super.init(analyzer);
+ backendPolicy.init();
+ initContext(context);
+ }
+
+ private void initContext(ParamCreateContext context) throws DdlException, MetaNotFoundException {
+ context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor();
+ context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ context.params = new TBrokerScanRangeParams();
+ if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) {
+ Map<String, String> serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
+ String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim"))
+ ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim");
+ String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim"))
+ ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim");
+ context.params.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]);
+ context.params.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]);
+ context.params.setColumnSeparatorStr(columnSeparator);
+ context.params.setLineDelimiterStr(lineDelimiter);
+ context.params.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length);
+ context.params.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length);
+ }
+
+ Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
+
+ List<Column> columns = hmsTable.getBaseSchema(false);
+ for (Column column : columns) {
+ SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor);
+ slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+ slotDesc.setIsMaterialized(true);
+ slotDesc.setIsNullable(true);
+ slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
+ context.params.addToSrcSlotIds(slotDesc.getId().asInt());
+ slotDescByName.put(column.getName(), slotDesc);
+ }
+ context.slotDescByName = slotDescByName;
+ }
+
+ @Override
+ public void finalize(Analyzer analyzer) throws UserException {
+ try {
+ finalizeParams(context.slotDescByName, context.params, context.srcTupleDescriptor);
+ buildScanRange();
+ } catch (IOException e) {
+ LOG.error("Finalize failed.", e);
+ throw new UserException("Finalize failed.", e);
+ }
+ }
+
+ // If fileFormat is not null, we use fileFormat instead of check file's suffix
+ private void buildScanRange() throws UserException, IOException {
+ scanRangeLocations = Lists.newArrayList();
+ InputSplit[] inputSplits = scanProvider.getSplits(conjuncts);
+ if (0 == inputSplits.length) {
+ return;
+ }
+
+ THdfsParams hdfsParams = new THdfsParams();
+ String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
+ String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
+ String fsName = fullPath.replace(filePath, "");
+ hdfsParams.setFsName(fsName);
+ List<String> partitionKeys = new ArrayList<>();
+ for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) {
+ partitionKeys.add(fieldSchema.getName());
+ }
+
+ for (InputSplit split : inputSplits) {
+ FileSplit fileSplit = (FileSplit) split;
+
+ TScanRangeLocations curLocations = newLocations(context.params);
+ List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
+ partitionKeys);
+ int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size();
+
+ TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, partitionValuesFromPath,
+ numberOfColumnsFromFile);
+ rangeDesc.setHdfsParams(hdfsParams);
+ rangeDesc.setReadByColumnDef(true);
+
+ curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
+ Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId()
+ + " with table split: " + fileSplit.getPath()
+ + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")");
+
+ // Put the last file
+ if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
+ scanRangeLocations.add(curLocations);
+ }
+ }
+ }
+
+ private TScanRangeLocations newLocations(TBrokerScanRangeParams params) {
+ // Generate on broker scan range
+ TBrokerScanRange brokerScanRange = new TBrokerScanRange();
+ brokerScanRange.setParams(params);
+ brokerScanRange.setBrokerAddresses(new ArrayList<>());
+
+ // Scan range
+ TScanRange scanRange = new TScanRange();
+ scanRange.setBrokerScanRange(brokerScanRange);
+
+ // Locations
+ TScanRangeLocations locations = new TScanRangeLocations();
+ locations.setScanRange(scanRange);
+
+ TScanRangeLocation location = new TScanRangeLocation();
+ Backend selectedBackend = backendPolicy.getNextBe();
+ location.setBackendId(selectedBackend.getId());
+ location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
+ locations.addToLocations(location);
+
+ return locations;
+ }
+
+ private TBrokerRangeDesc createBrokerRangeDesc(
+ FileSplit fileSplit,
+ List<String> columnsFromPath,
+ int numberOfColumnsFromFile) throws DdlException, MetaNotFoundException {
+ TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
+ rangeDesc.setFileType(scanProvider.getTableFileType());
+ rangeDesc.setFormatType(scanProvider.getTableFormatType());
+ rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
+ rangeDesc.setSplittable(true);
+ rangeDesc.setStartOffset(fileSplit.getStart());
+ rangeDesc.setSize(fileSplit.getLength());
+ rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
+ rangeDesc.setColumnsFromPath(columnsFromPath);
+ // set hdfs params for hdfs file type.
+ if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
+ BrokerUtil.generateHdfsParam(scanProvider.getTableProperties(), rangeDesc);
+ }
+ return rangeDesc;
+ }
+
+ private void finalizeParams(
+ Map<String, SlotDescriptor> slotDescByName,
+ TBrokerScanRangeParams params,
+ TupleDescriptor srcTupleDesc) throws UserException {
+ Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
+ for (SlotDescriptor destSlotDesc : desc.getSlots()) {
+ Expr expr;
+ SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName());
+ if (srcSlotDesc != null) {
+ destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
+ // If dest is allow null, we set source to nullable
+ if (destSlotDesc.getColumn().isAllowNull()) {
+ srcSlotDesc.setIsNullable(true);
+ }
+ expr = new SlotRef(srcSlotDesc);
+ } else {
+ Column column = destSlotDesc.getColumn();
+ if (column.getDefaultValue() != null) {
+ expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue());
+ } else {
+ if (column.isAllowNull()) {
+ expr = NullLiteral.create(column.getType());
+ } else {
+ throw new AnalysisException("column has no source field, column=" + column.getName());
+ }
+ }
+ }
+
+ expr = castToSlot(destSlotDesc, expr);
+ params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
+ }
+ params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
+ params.setDestTupleId(desc.getId().asInt());
+ params.setStrictMode(false);
+ params.setSrcTupleId(srcTupleDesc.getId().asInt());
+
+ // Need re compute memory layout after set some slot descriptor to nullable
+ srcTupleDesc.computeStatAndMemLayout();
+ }
+
+ @Override
+ public int getNumInstances() {
+ return scanRangeLocations.size();
+ }
+
+ @Override
+ protected void toThrift(TPlanNode planNode) {
+ planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE);
+ TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
+ if (!preFilterConjuncts.isEmpty()) {
+ if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
+ brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
+ } else {
+ for (Expr e : preFilterConjuncts) {
+ brokerScanNode.addToPreFilterExprs(e.treeToThrift());
+ }
+ }
+ }
+ planNode.setBrokerScanNode(brokerScanNode);
+ }
+
+ @Override
+ public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
+ return scanRangeLocations;
+ }
+
+ @Override
+ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
+ String url;
+ try {
+ url = scanProvider.getMetaStoreUrl();
+ } catch (MetaNotFoundException e) {
+ LOG.warn("Can't get url error", e);
+ url = "Can't get url error.";
+ }
+ return prefix + "DATABASE: " + hmsTable.getDbName() + "\n"
+ + prefix + "TABLE: " + hmsTable.getName() + "\n"
+ + prefix + "HIVE URL: " + url + "\n";
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
new file mode 100644
index 0000000000..2fc626ab70
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An interface for file scan node to get the need information.
+ */
+public interface ExternalFileScanProvider {
+ TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException;
+
+ TFileType getTableFileType();
+
+ String getMetaStoreUrl() throws MetaNotFoundException;
+
+ InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException;
+
+ Table getRemoteHiveTable() throws DdlException, MetaNotFoundException;
+
+ Map<String, String> getTableProperties() throws MetaNotFoundException;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
new file mode 100644
index 0000000000..979a5a29d3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A HiveScanProvider to get information for scan node.
+ */
+public class ExternalHiveScanProvider implements ExternalFileScanProvider {
+ protected HMSExternalTable hmsTable;
+
+ public ExternalHiveScanProvider(HMSExternalTable hmsTable) {
+ this.hmsTable = hmsTable;
+ }
+
+ @Override
+ public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException {
+ TFileFormatType type = null;
+ String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
+ String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
+ if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) {
+ type = TFileFormatType.FORMAT_PARQUET;
+ } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
+ type = TFileFormatType.FORMAT_ORC;
+ } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) {
+ type = TFileFormatType.FORMAT_CSV_PLAIN;
+ }
+ return type;
+ }
+
+ @Override
+ public TFileType getTableFileType() {
+ return TFileType.FILE_HDFS;
+ }
+
+ @Override
+ public String getMetaStoreUrl() throws MetaNotFoundException {
+ return getTableProperties().get(HiveConf.ConfVars.METASTOREURIS.name());
+ }
+
+ @Override
+ public InputSplit[] getSplits(List<Expr> exprs)
+ throws IOException, UserException {
+ String splitsPath = getRemoteHiveTable().getSd().getLocation();
+ List<String> partitionKeys = getRemoteHiveTable().getPartitionKeys()
+ .stream().map(FieldSchema::getName).collect(Collectors.toList());
+
+ if (partitionKeys.size() > 0) {
+ ExprNodeGenericFuncDesc hivePartitionPredicate = extractHivePartitionPredicate(exprs, partitionKeys);
+
+ String metaStoreUris = getMetaStoreUrl();
+ List<Partition> hivePartitions = HiveMetaStoreClientHelper.getHivePartitions(
+ metaStoreUris, getRemoteHiveTable(), hivePartitionPredicate);
+ if (!hivePartitions.isEmpty()) {
+ splitsPath = hivePartitions.stream().map(x -> x.getSd().getLocation())
+ .collect(Collectors.joining(","));
+ }
+ }
+
+ String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
+
+ Configuration configuration = new Configuration();
+ InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false);
+ JobConf jobConf = new JobConf(configuration);
+ FileInputFormat.setInputPaths(jobConf, splitsPath);
+ return inputFormat.getSplits(jobConf, 0);
+ }
+
+
+ private ExprNodeGenericFuncDesc extractHivePartitionPredicate(List<Expr> conjuncts, List<String> partitionKeys)
+ throws DdlException {
+ ExprNodeGenericFuncDesc hivePartitionPredicate;
+ List<ExprNodeDesc> exprNodeDescs = new ArrayList<>();
+ for (Expr conjunct : conjuncts) {
+ ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr(
+ conjunct, partitionKeys, hmsTable.getName());
+ if (hiveExpr != null) {
+ exprNodeDescs.add(hiveExpr);
+ }
+ }
+ int count = exprNodeDescs.size();
+
+ if (count >= 2) {
+ hivePartitionPredicate = HiveMetaStoreClientHelper.getCompoundExpr(exprNodeDescs, "and");
+ } else if (count == 1) {
+ hivePartitionPredicate = (ExprNodeGenericFuncDesc) exprNodeDescs.get(0);
+ } else {
+ HiveMetaStoreClientHelper.ExprBuilder exprBuilder =
+ new HiveMetaStoreClientHelper.ExprBuilder(hmsTable.getName());
+ hivePartitionPredicate = exprBuilder.val(TypeInfoFactory.intTypeInfo, 1)
+ .val(TypeInfoFactory.intTypeInfo, 1)
+ .pred("=", 2).build();
+ }
+ return hivePartitionPredicate;
+ }
+
+ @Override
+ public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException {
+ return hmsTable.getRemoteTable();
+ }
+
+ @Override
+ public Map<String, String> getTableProperties() throws MetaNotFoundException {
+ return hmsTable.getRemoteTable().getParameters();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
new file mode 100644
index 0000000000..2951c1fb7a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHudiScanProvider.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.thrift.TFileFormatType;
+
+/**
+ * A file scan provider for hudi.
+ * HudiProvier is extended with hive since they both use input format interface to get the spilt.
+ */
+public class ExternalHudiScanProvider extends ExternalHiveScanProvider {
+
+ public ExternalHudiScanProvider(HMSExternalTable hmsTable) {
+ super(hmsTable);
+ }
+
+ @Override
+ public TFileFormatType getTableFormatType() throws DdlException {
+ return TFileFormatType.FORMAT_PARQUET;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
new file mode 100644
index 0000000000..28ba3dad0a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.IcebergProperty;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.external.iceberg.HiveCatalog;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A file scan provider for iceberg.
+ */
+public class ExternalIcebergScanProvider extends ExternalHiveScanProvider {
+
+ public ExternalIcebergScanProvider(HMSExternalTable hmsTable) {
+ super(hmsTable);
+ }
+
+ @Override
+ public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException {
+ TFileFormatType type;
+
+ String icebergFormat = getRemoteHiveTable().getParameters()
+ .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ if (icebergFormat.equals("parquet")) {
+ type = TFileFormatType.FORMAT_PARQUET;
+ } else if (icebergFormat.equals("orc")) {
+ type = TFileFormatType.FORMAT_ORC;
+ } else {
+ throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat));
+ }
+ return type;
+ }
+
+ @Override
+ public TFileType getTableFileType() {
+ return TFileType.FILE_HDFS;
+ }
+
+ @Override
+ public InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException {
+ List<Expression> expressions = new ArrayList<>();
+ for (Expr conjunct : exprs) {
+ Expression expression = IcebergUtils.convertToIcebergExpr(conjunct);
+ if (expression != null) {
+ expressions.add(expression);
+ }
+ }
+
+ org.apache.iceberg.Table table = getIcebergTable();
+ TableScan scan = table.newScan();
+ for (Expression predicate : expressions) {
+ scan = scan.filter(predicate);
+ }
+ List<FileSplit> splits = new ArrayList<>();
+
+ for (FileScanTask task : scan.planFiles()) {
+ for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
+ splits.add(new FileSplit(new Path(spitTask.file().path().toString()),
+ spitTask.start(), spitTask.length(), new String[0]));
+ }
+ }
+ return splits.toArray(new InputSplit[0]);
+ }
+
+ private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
+ HiveCatalog hiveCatalog = new HiveCatalog();
+ hiveCatalog.initialize(new IcebergProperty(getTableProperties()));
+ return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName()));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org