You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/07 16:13:07 UTC
[doris] 13/13: [Feature](multi-catalog)support paimon catalog (#19681)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git
commit f4f86b07d842a242edd1cf4abe1ede8019c89584
Author: yuxuan-luo <11...@users.noreply.github.com>
AuthorDate: Tue Jun 6 15:08:30 2023 +0800
[Feature](multi-catalog)support paimon catalog (#19681)
CREATE CATALOG paimon_n2 PROPERTIES (
"dfs.ha.namenodes.HDFS1006531" = "nn2,nn1",
"dfs.namenode.rpc-address.HDFS1006531.nn2" = "172.16.65.xx:4007",
"dfs.namenode.rpc-address.HDFS1006531.nn1" = "172.16.65.xx:4007",
"hive.metastore.uris" = "thrift://172.16.65.xx:7004",
"type" = "paimon",
"dfs.nameservices" = "HDFS1006531",
"hadoop.username" = "hadoop",
"paimon.catalog.type" = "hms",
"warehouse" = "hdfs://HDFS1006531/data/paimon1",
"dfs.client.failover.proxy.provider.HDFS1006531" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
---
be/src/vec/CMakeLists.txt | 1 +
be/src/vec/exec/scan/paimon_reader.cpp | 84 ++++++++++
be/src/vec/exec/scan/paimon_reader.h | 76 +++++++++
be/src/vec/exec/scan/vfile_scanner.cpp | 8 +
fe/fe-core/pom.xml | 19 +++
.../java/org/apache/doris/catalog/TableIf.java | 4 +-
.../catalog/external/PaimonExternalDatabase.java | 72 ++++++++
.../catalog/external/PaimonExternalTable.java | 132 +++++++++++++++
.../apache/doris/datasource/CatalogFactory.java | 4 +
.../apache/doris/datasource/ExternalCatalog.java | 3 +
.../apache/doris/datasource/InitCatalogLog.java | 1 +
.../apache/doris/datasource/InitDatabaseLog.java | 1 +
.../datasource/paimon/PaimonExternalCatalog.java | 103 ++++++++++++
.../paimon/PaimonHMSExternalCatalog.java | 105 ++++++++++++
.../property/constants/PaimonProperties.java} | 18 +-
.../glue/translator/PhysicalPlanTranslator.java | 4 +
.../org/apache/doris/persist/gson/GsonUtils.java | 8 +
.../apache/doris/planner/SingleNodePlanner.java | 4 +
.../doris/planner/external/FileQueryScanNode.java | 4 +
.../doris/planner/external/TableFormatType.java | 3 +-
.../planner/external/paimon/PaimonScanNode.java | 177 ++++++++++++++++++++
.../planner/external/paimon/PaimonSource.java | 64 +++++++
.../doris/planner/external/paimon/PaimonSplit.java | 65 +++++++
.../org/apache/doris/statistics/DeriveFactory.java | 1 +
.../apache/doris/statistics/StatisticalType.java | 1 +
fe/java-udf/pom.xml | 21 ++-
.../org/apache/doris/jni/PaimonJniScanner.java | 186 +++++++++++++++++++++
.../apache/doris/jni/vec/PaimonColumnValue.java | 131 +++++++++++++++
fe/pom.xml | 2 +-
gensrc/thrift/PlanNodes.thrift | 24 ++-
30 files changed, 1299 insertions(+), 27 deletions(-)
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 2bf3f245a1..bec6a747b5 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -352,6 +352,7 @@ set(VEC_FILES
exec/format/parquet/bool_rle_decoder.cpp
exec/jni_connector.cpp
exec/scan/jni_reader.cpp
+ exec/scan/paimon_reader.cpp
exec/scan/max_compute_jni_reader.cpp
)
diff --git a/be/src/vec/exec/scan/paimon_reader.cpp b/be/src/vec/exec/scan/paimon_reader.cpp
new file mode 100644
index 0000000000..906973d838
--- /dev/null
+++ b/be/src/vec/exec/scan/paimon_reader.cpp
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "paimon_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
+ RuntimeState* state, RuntimeProfile* profile,
+ const TFileRangeDesc& range)
+ : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
+ std::vector<std::string> column_names;
+ for (auto& desc : _file_slot_descs) {
+ std::string field = desc->col_name();
+ column_names.emplace_back(field);
+ }
+ std::map<String, String> params;
+ params["required_fields"] = range.table_format_params.paimon_params.paimon_column_names;
+ params["columns_types"] = range.table_format_params.paimon_params.paimon_column_types;
+ params["columns_id"] = range.table_format_params.paimon_params.paimon_column_ids;
+ params["hive.metastore.uris"] = range.table_format_params.paimon_params.hive_metastore_uris;
+ params["warehouse"] = range.table_format_params.paimon_params.warehouse;
+ params["db_name"] = range.table_format_params.paimon_params.db_name;
+ params["table_name"] = range.table_format_params.paimon_params.table_name;
+ params["length_byte"] = range.table_format_params.paimon_params.length_byte;
+ params["split_byte"] =
+ std::to_string((int64_t)range.table_format_params.paimon_params.paimon_split.data());
+ _jni_connector = std::make_unique<JniConnector>("org/apache/doris/jni/PaimonJniScanner", params,
+ column_names);
+}
+
+Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+ RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
+ if (*eof) {
+ RETURN_IF_ERROR(_jni_connector->close());
+ }
+ return Status::OK();
+}
+
+Status PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>* missing_cols) {
+ for (auto& desc : _file_slot_descs) {
+ name_to_type->emplace(desc->col_name(), desc->type());
+ }
+ return Status::OK();
+}
+
+Status PaimonJniReader::init_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
+ _colname_to_value_range = colname_to_value_range;
+ RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+ return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/paimon_reader.h b/be/src/vec/exec/scan/paimon_reader.h
new file mode 100644
index 0000000000..be90d6d849
--- /dev/null
+++ b/be/src/vec/exec/scan/paimon_reader.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+/**
+ * The demo usage of JniReader, showing how to read data from java scanner.
+ * The java side is also a mock reader that provide values for each type.
+ * This class will only be retained during the functional testing phase to verify that
+ * the communication and data exchange with the jvm are correct.
+ */
+class PaimonJniReader : public GenericReader {
+ ENABLE_FACTORY_CREATOR(PaimonJniReader);
+
+public:
+ PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
+ RuntimeProfile* profile, const TFileRangeDesc& range);
+
+ ~PaimonJniReader() override = default;
+
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+ Status init_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
+
+private:
+ const std::vector<SlotDescriptor*>& _file_slot_descs;
+ RuntimeState* _state;
+ RuntimeProfile* _profile;
+ std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
+ std::unique_ptr<JniConnector> _jni_connector;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index b7f8119553..a539abc9f0 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -62,6 +62,7 @@
#include "vec/exec/format/table/iceberg_reader.h"
#include "vec/exec/scan/max_compute_jni_reader.h"
#include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/paimon_reader.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
@@ -600,6 +601,13 @@ Status VFileScanner::_get_next_reader() {
init_status = mc_reader->init_reader(_colname_to_value_range);
_cur_reader = std::move(mc_reader);
}
+ if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "paimon") {
+ _cur_reader =
+ PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range);
+ init_status = ((PaimonJniReader*)(_cur_reader.get()))
+ ->init_reader(_colname_to_value_range);
+ }
break;
}
case TFileFormatType::FORMAT_PARQUET: {
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 4bce0193b4..db8e0b7ec1 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -34,6 +34,7 @@ under the License.
<fe_ut_parallel>1</fe_ut_parallel>
<antlr4.version>4.9.3</antlr4.version>
<awssdk.version>2.17.257</awssdk.version>
+ <paimon.version>0.4-SNAPSHOT</paimon.version>
</properties>
<profiles>
<profile>
@@ -529,6 +530,24 @@ under the License.
<artifactId>iceberg-aws</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-format</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index c79acc79df..95f8873c60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -134,7 +134,8 @@ public interface TableIf {
enum TableType {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, @Deprecated HUDI, JDBC,
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
- ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE, HUDI_EXTERNAL_TABLE;
+ ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE,
+ HUDI_EXTERNAL_TABLE;
public String toEngineName() {
switch (this) {
@@ -198,6 +199,7 @@ public interface TableIf {
case HMS_EXTERNAL_TABLE:
case ES_EXTERNAL_TABLE:
case ICEBERG_EXTERNAL_TABLE:
+ case PAIMON_EXTERNAL_TABLE:
return "EXTERNAL TABLE";
default:
return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
new file mode 100644
index 0000000000..ac6d6932c6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable> implements GsonPostProcessable {
+
+ private static final Logger LOG = LogManager.getLogger(PaimonExternalDatabase.class);
+
+ public PaimonExternalDatabase(ExternalCatalog extCatalog, Long id, String name) {
+ super(extCatalog, id, name, InitDatabaseLog.Type.PAIMON);
+ }
+
+ @Override
+ protected PaimonExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+ return new PaimonExternalTable(tblId, tableName, name, (PaimonExternalCatalog) extCatalog);
+ }
+
+ @Override
+ public List<PaimonExternalTable> getTablesOnIdOrder() {
+ // Sort the name instead, because the id may change.
+ return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
+ }
+
+ @Override
+ public void dropTable(String tableName) {
+ LOG.debug("drop table [{}]", tableName);
+ makeSureInitialized();
+ Long tableId = tableNameToId.remove(tableName);
+ if (tableId == null) {
+ LOG.warn("drop table [{}] failed", tableName);
+ }
+ idToTbl.remove(tableId);
+ }
+
+ @Override
+ public void createTable(String tableName, long tableId) {
+ LOG.debug("create table [{}]", tableName);
+ makeSureInitialized();
+ tableNameToId.put(tableName, tableId);
+ PaimonExternalTable table = new PaimonExternalTable(tableId, tableName, name,
+ (PaimonExternalCatalog) extCatalog);
+ idToTbl.put(tableId, table);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
new file mode 100644
index 0000000000..c821160dd4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.thrift.THiveTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DecimalType;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class PaimonExternalTable extends ExternalTable {
+
+ private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class);
+
+ public static final int PAIMON_DATETIME_SCALE_MS = 3;
+ private Table originTable = null;
+
+ public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) {
+ super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE);
+ }
+
+ public String getPaimonCatalogType() {
+ return ((PaimonExternalCatalog) catalog).getPaimonCatalogType();
+ }
+
+ protected synchronized void makeSureInitialized() {
+ super.makeSureInitialized();
+ if (!objectCreated) {
+ objectCreated = true;
+ }
+ }
+
+ public Table getOriginTable() {
+ if (originTable == null) {
+ originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
+ }
+ return originTable;
+ }
+
+ @Override
+ public List<Column> initSchema() {
+ Table table = getOriginTable();
+ TableSchema schema = ((AbstractFileStoreTable) table).schema();
+ List<DataField> columns = schema.fields();
+ List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
+ for (DataField field : columns) {
+ tmpSchema.add(new Column(field.name(),
+ paimonTypeToDorisType(field.type()), true, null, true, field.description(), true,
+ field.id()));
+ }
+ return tmpSchema;
+ }
+
+ private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
+ switch (dataType.getTypeRoot()) {
+ case BOOLEAN:
+ return Type.BOOLEAN;
+ case INTEGER:
+ return Type.INT;
+ case BIGINT:
+ return Type.BIGINT;
+ case FLOAT:
+ return Type.FLOAT;
+ case DOUBLE:
+ return Type.DOUBLE;
+ case VARCHAR:
+ case BINARY:
+ case CHAR:
+ return Type.STRING;
+ case DECIMAL:
+ DecimalType decimal = (DecimalType) dataType;
+ return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale());
+ case DATE:
+ return ScalarType.createDateV2Type();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return ScalarType.createDatetimeV2Type(PAIMON_DATETIME_SCALE_MS);
+ case TIME_WITHOUT_TIME_ZONE:
+ return Type.UNSUPPORTED;
+ default:
+ throw new IllegalArgumentException("Cannot transform unknown type: " + dataType.getTypeRoot());
+ }
+ }
+
+ protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) {
+ return paimonPrimitiveTypeToDorisType(type);
+ }
+
+ @Override
+ public TTableDescriptor toThrift() {
+ List<Column> schema = getFullSchema();
+ if (getPaimonCatalogType().equals("hms")) {
+ THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
+ getName(), dbName);
+ tTableDescriptor.setHiveTable(tHiveTable);
+ return tTableDescriptor;
+ } else {
+ throw new IllegalArgumentException("Currently only supports hms catalog,not support :"
+ + getPaimonCatalogType());
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index f530bcc5f8..358b53a274 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.Resource;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
+import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog;
import org.apache.doris.datasource.test.TestExternalCatalog;
import com.google.common.base.Strings;
@@ -122,6 +123,9 @@ public class CatalogFactory {
case "iceberg":
catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props, comment);
break;
+ case "paimon":
+ catalog = new PaimonHMSExternalCatalog(catalogId, name, resource, props, comment);
+ break;
case "max_compute":
catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props, comment);
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index a23842c9bd..cf2de86494 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
+import org.apache.doris.catalog.external.PaimonExternalDatabase;
import org.apache.doris.catalog.external.TestExternalDatabase;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
@@ -448,6 +449,8 @@ public abstract class ExternalCatalog
//return new HudiExternalDatabase(this, dbId, dbName);
case TEST:
return new TestExternalDatabase(this, dbId, dbName);
+ case PAIMON:
+ return new PaimonExternalDatabase(this, dbId, dbName);
default:
break;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
index ecc284b325..73fbeeb781 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -37,6 +37,7 @@ public class InitCatalogLog implements Writable {
ES,
JDBC,
ICEBERG,
+ PAIMON,
MAX_COMPUTE,
HUDI,
TEST,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
index a49dd5232c..14cd4410ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
@@ -39,6 +39,7 @@ public class InitDatabaseLog implements Writable {
JDBC,
MAX_COMPUTE,
HUDI,
+ PAIMON,
TEST,
UNKNOWN;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
new file mode 100644
index 0000000000..3c024fadfb
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class PaimonExternalCatalog extends ExternalCatalog {
+
+ private static final Logger LOG = LogManager.getLogger(PaimonExternalCatalog.class);
+ public static final String PAIMON_HMS = "hms";
+ protected String paimonCatalogType;
+ protected Catalog catalog;
+
+ public PaimonExternalCatalog(long catalogId, String name, String comment) {
+ super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
+ this.type = "paimon";
+ }
+
+ @Override
+ protected void init() {
+ super.init();
+ }
+
+ protected Configuration getConfiguration() {
+ Configuration conf = new HdfsConfiguration();
+ Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
+ for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ return conf;
+ }
+
+ public Catalog getCatalog() {
+ makeSureInitialized();
+ return catalog;
+ }
+
+ public String getPaimonCatalogType() {
+ makeSureInitialized();
+ return paimonCatalogType;
+ }
+
+ protected List<String> listDatabaseNames() {
+ return new ArrayList<>(catalog.listDatabases());
+ }
+
+ @Override
+ public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
+ makeSureInitialized();
+ return catalog.tableExists(Identifier.create(dbName, tblName));
+ }
+
+ @Override
+ public List<String> listTableNames(SessionContext ctx, String dbName) {
+ makeSureInitialized();
+ List<String> tableNames = null;
+ try {
+ tableNames = catalog.listTables(dbName);
+ } catch (Catalog.DatabaseNotExistException e) {
+ LOG.warn("DatabaseNotExistException", e);
+ }
+ return tableNames;
+ }
+
+ public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName) {
+ makeSureInitialized();
+ org.apache.paimon.table.Table table = null;
+ try {
+ table = catalog.getTable(Identifier.create(dbName, tblName));
+ } catch (Catalog.TableNotExistException e) {
+ LOG.warn("TableNotExistException", e);
+ }
+ return table;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
new file mode 100644
index 0000000000..13775b0edf
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+
+
+public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PaimonHMSExternalCatalog.class);
+ public static final String METASTORE = "metastore";
+ public static final String METASTORE_HIVE = "hive";
+ public static final String URI = "uri";
+ private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+ ConfigOptions.key("metastore.client.class")
+ .stringType()
+ .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+ .withDescription(
+ "Class name of Hive metastore client.\n"
+ + "NOTE: This class must directly implements "
+ + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+
+ public PaimonHMSExternalCatalog(long catalogId, String name, String resource,
+ Map<String, String> props, String comment) {
+ super(catalogId, name, comment);
+ props = PropertyConverter.convertToMetaProperties(props);
+ catalogProperty = new CatalogProperty(resource, props);
+ paimonCatalogType = PAIMON_HMS;
+ }
+
+ @Override
+ protected void initLocalObjectsImpl() {
+ String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
+ String warehouse = catalogProperty.getOrDefault(PaimonProperties.WAREHOUSE, "");
+ Options options = new Options();
+ options.set(PaimonProperties.WAREHOUSE, warehouse);
+ // Currently, only supports hive
+ options.set(METASTORE, METASTORE_HIVE);
+ options.set(URI, metastoreUris);
+ CatalogContext context = CatalogContext.create(options, getConfiguration());
+ try {
+ catalog = create(context);
+ } catch (IOException e) {
+ LOG.warn("failed to create paimon external catalog ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Catalog create(CatalogContext context) throws IOException {
+ Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE));
+ FileIO fileIO;
+ fileIO = FileIO.get(warehousePath, context);
+ String uri = context.options().get(CatalogOptions.URI);
+ String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR);
+ String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR);
+ HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir);
+
+ // always using user-set parameters overwrite hive-site.xml parameters
+ context.options().toMap().forEach(hiveConf::set);
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+ // set the warehouse location to the hiveConf
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE));
+
+ String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
+
+ return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap());
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
similarity index 70%
copy from fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
index 6fc5d69544..e372dd5788 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
@@ -15,20 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.datasource.property.constants;
-public enum TableFormatType {
- HIVE("hive"),
- ICEBERG("iceberg"),
- HUDI("hudi");
-
- private final String tableFormatType;
-
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
- }
-
- public String value() {
- return tableFormatType;
- }
+public class PaimonProperties {
+ public static final String WAREHOUSE = "warehouse";
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 5d2b3c8f36..03f95dff5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -47,6 +47,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalTable;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
@@ -157,6 +158,7 @@ import org.apache.doris.planner.UnionNode;
import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.planner.external.HudiScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TFetchOption;
@@ -717,6 +719,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
} else if (table instanceof IcebergExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
+ } else if (table instanceof PaimonExternalTable) {
+ scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
}
Preconditions.checkNotNull(scanNode);
fileScan.getConjuncts().stream()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index bf7c0a5484..413a96eac2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -51,6 +51,8 @@ import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalTable;
import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
import org.apache.doris.catalog.external.MaxComputeExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalDatabase;
+import org.apache.doris.catalog.external.PaimonExternalTable;
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.EsExternalCatalog;
@@ -63,6 +65,8 @@ import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
@@ -194,6 +198,8 @@ public class GsonUtils {
.registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName())
+ .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName())
+ .registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName())
.registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName());
// routine load data source
private static RuntimeTypeAdapterFactory<AbstractDataSourceProperties> rdsTypeAdapterFactory =
@@ -208,6 +214,7 @@ public class GsonUtils {
.registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName())
.registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName())
.registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName())
+ .registerSubtype(PaimonExternalDatabase.class, PaimonExternalDatabase.class.getSimpleName())
.registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName());
private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
@@ -216,6 +223,7 @@ public class GsonUtils {
.registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName())
.registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName())
.registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName())
+ .registerSubtype(PaimonExternalTable.class, PaimonExternalTable.class.getSimpleName())
.registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName());
// runtime adapter for class "HeartbeatResponse"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0ae4a35edb..97de7d82ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -73,6 +73,7 @@ import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.planner.external.HudiScanNode;
import org.apache.doris.planner.external.MaxComputeScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.statistics.StatisticalType;
@@ -2019,6 +2020,9 @@ public class SingleNodePlanner {
case ICEBERG_EXTERNAL_TABLE:
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
+ case PAIMON_EXTERNAL_TABLE:
+ scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
+ break;
case MAX_COMPUTE_EXTERNAL_TABLE:
// TODO: support max compute scan node
scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "MCScanNode",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 508b174fb5..4c794d083f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -38,6 +38,8 @@ import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.iceberg.IcebergSplit;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
+import org.apache.doris.planner.external.paimon.PaimonSplit;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
@@ -279,6 +281,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
if (fileSplit instanceof IcebergSplit) {
// TODO: extract all data lake split to factory
IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit);
+ } else if (fileSplit instanceof PaimonSplit) {
+ PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit) fileSplit);
}
// if (fileSplit instanceof HudiSplit) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index 6fc5d69544..f97c8ea1ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -20,7 +20,8 @@ package org.apache.doris.planner.external;
public enum TableFormatType {
HIVE("hive"),
ICEBERG("iceberg"),
- HUDI("hudi");
+ HUDI("hudi"),
+ PAIMON("paimon");
private final String tableFormatType;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
new file mode 100644
index 0000000000..13da29b2bd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3Util;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPaimonFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import avro.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PaimonScanNode extends FileQueryScanNode {
+ private static PaimonSource source = null;
+
+ public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
+ super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
+ }
+
+ @Override
+ protected void doInitialize() throws UserException {
+ ExternalTable table = (ExternalTable) desc.getTable();
+ if (table.isView()) {
+ throw new AnalysisException(
+ String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName()));
+ }
+ computeColumnFilter();
+ initBackendPolicy();
+ source = new PaimonSource((PaimonExternalTable) table, desc, columnNameToRange);
+ Preconditions.checkNotNull(source);
+ initSchemaParams();
+ }
+
+ public static void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
+ TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+ tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
+ TPaimonFileDesc fileDesc = new TPaimonFileDesc();
+ fileDesc.setPaimonSplit(paimonSplit.getSerializableSplit());
+ fileDesc.setLengthByte(Integer.toString(paimonSplit.getSerializableSplit().length));
+ //Paimon columnNames,columnTypes,columnIds that need to be transported into JNI
+ StringBuilder columnNamesBuilder = new StringBuilder();
+ StringBuilder columnTypesBuilder = new StringBuilder();
+ StringBuilder columnIdsBuilder = new StringBuilder();
+ Map<String, Integer> paimonFieldsId = new HashMap<>();
+ Map<String, String> paimonFieldsName = new HashMap<>();
+ for (DataField field : ((AbstractFileStoreTable) source.getPaimonTable()).schema().fields()) {
+ paimonFieldsId.put(field.name(), field.id());
+ paimonFieldsName.put(field.name(), field.type().toString());
+ }
+ boolean isFirst = true;
+ for (SlotDescriptor slot : source.getDesc().getSlots()) {
+ if (!isFirst) {
+ columnNamesBuilder.append(",");
+ columnTypesBuilder.append(",");
+ columnIdsBuilder.append(",");
+ }
+ columnNamesBuilder.append(slot.getColumn().getName());
+ columnTypesBuilder.append(paimonFieldsName.get(slot.getColumn().getName()));
+ columnIdsBuilder.append(paimonFieldsId.get(slot.getColumn().getName()));
+ isFirst = false;
+ }
+ fileDesc.setPaimonColumnIds(columnIdsBuilder.toString());
+ fileDesc.setPaimonColumnNames(columnNamesBuilder.toString());
+ fileDesc.setPaimonColumnTypes(columnTypesBuilder.toString());
+ fileDesc.setHiveMetastoreUris(source.getCatalog().getCatalogProperty().getProperties()
+ .get(HiveConf.ConfVars.METASTOREURIS.varname));
+ fileDesc.setWarehouse(source.getCatalog().getCatalogProperty().getProperties()
+ .get(PaimonProperties.WAREHOUSE));
+ fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName());
+ fileDesc.setTableName(source.getTargetTable().getName());
+ tableFormatFileDesc.setPaimonParams(fileDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ }
+
+ @Override
+ public List<Split> getSplits() throws UserException {
+ List<Split> splits = new ArrayList<>();
+ ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
+ List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.newScan().plan().splits();
+ for (org.apache.paimon.table.source.Split split : paimonSplits) {
+ PaimonInputSplit inputSplit = new PaimonInputSplit(
+ "tempDir",
+ (DataSplit) split
+ );
+ PaimonSplit paimonSplit = new PaimonSplit(inputSplit,
+ ((AbstractFileStoreTable) source.getPaimonTable()).location().toString());
+ paimonSplit.setTableFormatType(TableFormatType.PAIMON);
+ splits.add(paimonSplit);
+ }
+ return splits;
+ }
+
+ @Override
+ public TFileType getLocationType() throws DdlException, MetaNotFoundException {
+ String location = ((AbstractFileStoreTable) source.getPaimonTable()).location().toString();
+ if (location != null && !location.isEmpty()) {
+ if (S3Util.isObjStorage(location)) {
+ return TFileType.FILE_S3;
+ } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
+ return TFileType.FILE_HDFS;
+ } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
+ return TFileType.FILE_LOCAL;
+ }
+ }
+ throw new DdlException("Unknown file location " + location
+ + " for hms table " + source.getPaimonTable().name());
+ }
+
+ @Override
+ public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
+ return TFileFormatType.FORMAT_JNI;
+ }
+
+ @Override
+ public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
+ return new ArrayList<>(source.getPaimonTable().partitionKeys());
+ }
+
+ @Override
+ public TFileAttributes getFileAttributes() throws UserException {
+ return source.getFileAttributes();
+ }
+
+ @Override
+ public TableIf getTargetTable() {
+ return source.getTargetTable();
+ }
+
+ @Override
+ public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
+ return source.getCatalog().getProperties();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
new file mode 100644
index 0000000000..2f55e30c08
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.PaimonExternalTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.thrift.TFileAttributes;
+
+import org.apache.paimon.table.Table;
+
+import java.util.Map;
+
+public class PaimonSource {
+ private final PaimonExternalTable paimonExtTable;
+ private final Table originTable;
+
+ private final TupleDescriptor desc;
+
+ public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
+ Map<String, ColumnRange> columnNameToRange) {
+ this.paimonExtTable = table;
+ this.originTable = paimonExtTable.getOriginTable();
+ this.desc = desc;
+ }
+
+ public TupleDescriptor getDesc() {
+ return desc;
+ }
+
+ public Table getPaimonTable() {
+ return originTable;
+ }
+
+ public TableIf getTargetTable() {
+ return paimonExtTable;
+ }
+
+ public TFileAttributes getFileAttributes() throws UserException {
+ return new TFileAttributes();
+ }
+
+ public ExternalCatalog getCatalog() {
+ return paimonExtTable.getCatalog();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
new file mode 100644
index 0000000000..e36740f0fd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.TableFormatType;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class PaimonSplit extends FileSplit {
+ private PaimonInputSplit split;
+ private TableFormatType tableFormatType;
+
+ public PaimonSplit(PaimonInputSplit split, String path) {
+ super(new Path(path), 0, 0, 0, null, null);
+ this.split = split;
+ }
+
+ public PaimonInputSplit getSplit() {
+ return split;
+ }
+
+ public void setSplit(PaimonInputSplit split) {
+ this.split = split;
+ }
+
+ public TableFormatType getTableFormatType() {
+ return tableFormatType;
+ }
+
+ public void setTableFormatType(TableFormatType tableFormatType) {
+ this.tableFormatType = tableFormatType;
+ }
+
+ public byte[] getSerializableSplit() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(baos);
+ try {
+ split.write(output);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return baos.toByteArray();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
index 35391191c7..ba22e067a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
@@ -51,6 +51,7 @@ public class DeriveFactory {
case ES_SCAN_NODE:
case HIVE_SCAN_NODE:
case ICEBERG_SCAN_NODE:
+ case PAIMON_SCAN_NODE:
case INTERSECT_NODE:
case SCHEMA_SCAN_NODE:
case STREAM_LOAD_SCAN_NODE:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 3a4a283c79..67dd9bb054 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -31,6 +31,7 @@ public enum StatisticalType {
HASH_JOIN_NODE,
HIVE_SCAN_NODE,
ICEBERG_SCAN_NODE,
+ PAIMON_SCAN_NODE,
HUDI_SCAN_NODE,
TVF_SCAN_NODE,
INTERSECT_NODE,
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 66d0123795..bcebf3871b 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -37,6 +37,7 @@ under the License.
<presto.hadoop.version>2.7.4-11</presto.hadoop.version>
<presto.hive.version>3.0.0-8</presto.hive.version>
<hudi.version>0.12.2</hudi.version>
+ <paimon.version>0.4-SNAPSHOT</paimon.version>
</properties>
<dependencies>
@@ -50,6 +51,21 @@ under the License.
<artifactId>fe-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-bundle</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-hive-connector-2.3</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>hive-common</artifactId>
+ <groupId>org.apache.hive</groupId>
+ <version>2.3.9</version>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
@@ -162,10 +178,7 @@ under the License.
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.doris</groupId>
- <artifactId>hive-catalog-shade</artifactId>
- </dependency>
+
</dependencies>
<build>
<finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java
new file mode 100644
index 0000000000..03c8b6564e
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+ private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+ private final String metastoreUris;
+ private final String warehouse;
+ private final String dbName;
+ private final String tblName;
+ private final String[] ids;
+ private final long splitAddress;
+ private final int lengthByte;
+ private PaimonInputSplit paimonInputSplit;
+ private Table table;
+ private RecordReader<InternalRow> reader;
+ private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+ public PaimonJniScanner(int batchSize, Map<String, String> params) {
+ metastoreUris = params.get("hive.metastore.uris");
+ warehouse = params.get("warehouse");
+ splitAddress = Long.parseLong(params.get("split_byte"));
+ lengthByte = Integer.parseInt(params.get("length_byte"));
+ LOG.info("splitAddress:" + splitAddress);
+ LOG.info("lengthByte:" + lengthByte);
+ dbName = params.get("db_name");
+ tblName = params.get("table_name");
+ String[] requiredFields = params.get("required_fields").split(",");
+ String[] types = params.get("columns_types").split(",");
+ ids = params.get("columns_id").split(",");
+ ColumnType[] columnTypes = new ColumnType[types.length];
+ for (int i = 0; i < types.length; i++) {
+ columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+ }
+ ScanPredicate[] predicates = new ScanPredicate[0];
+ if (params.containsKey("push_down_predicates")) {
+ long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+ if (predicatesAddress != 0) {
+ predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+ LOG.info("MockJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates));
+ }
+ }
+ initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+ }
+
+ @Override
+ public void open() throws IOException {
+ getCatalog();
+ // deserialize it into split
+ byte[] splitByte = new byte[lengthByte];
+ OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte);
+ ByteArrayInputStream bais = new ByteArrayInputStream(splitByte);
+ DataInputStream input = new DataInputStream(bais);
+ try {
+ paimonInputSplit.readFields(input);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableRead read = readBuilder.newRead();
+ reader = read.createReader(paimonInputSplit.split());
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ protected int getNext() throws IOException {
+ int rows = 0;
+ try {
+ RecordReader.RecordIterator batch;
+ while ((batch = reader.readBatch()) != null) {
+ Object record;
+ while ((record = batch.next()) != null) {
+ columnValue.setOffsetRow((ColumnarRow) record);
+ for (int i = 0; i < ids.length; i++) {
+ columnValue.setIdx(Integer.parseInt(ids[i]));
+ appendData(i, columnValue);
+ }
+ rows++;
+ }
+ batch.releaseBatch();
+ }
+ } catch (IOException e) {
+ LOG.warn("failed to getNext columnValue ", e);
+ throw new RuntimeException(e);
+ }
+ return rows;
+ }
+
+ private Catalog create(CatalogContext context) throws IOException {
+ Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE));
+ FileIO fileIO;
+ fileIO = FileIO.get(warehousePath, context);
+ String uri = context.options().get(CatalogOptions.URI);
+ String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR);
+ String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR);
+ HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir);
+
+ // always using user-set parameters overwrite hive-site.xml parameters
+ context.options().toMap().forEach(hiveConf::set);
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+ // set the warehouse location to the hiveConf
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE));
+
+ String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
+
+ return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap());
+ }
+
+ private void getCatalog() {
+ paimonInputSplit = new PaimonInputSplit();
+ Options options = new Options();
+ options.set("warehouse", warehouse);
+ // Currently, only supports hive
+ options.set("metastore", "hive");
+ options.set("uri", metastoreUris);
+ CatalogContext context = CatalogContext.create(options);
+ try {
+ Catalog catalog = create(context);
+ table = catalog.getTable(Identifier.create(dbName, tblName));
+ } catch (IOException | Catalog.TableNotExistException e) {
+ LOG.warn("failed to create paimon external catalog ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+ ConfigOptions.key("metastore.client.class")
+ .stringType()
+ .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+ .withDescription(
+ "Class name of Hive metastore client.\n"
+ + "NOTE: This class must directly implements "
+ + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java
new file mode 100644
index 0000000000..3d8bb5e42e
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni.vec;
+
+import org.apache.paimon.data.columnar.ColumnarRow;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+public class PaimonColumnValue implements ColumnValue {
+ private int idx;
+ private ColumnarRow record;
+
+ public PaimonColumnValue() {
+ }
+
+ public void setIdx(int idx) {
+ this.idx = idx;
+ }
+
+ public void setOffsetRow(ColumnarRow record) {
+ this.record = record;
+ }
+
+ @Override
+ public boolean getBoolean() {
+ return record.getBoolean(idx);
+ }
+
+ @Override
+ public byte getByte() {
+ return record.getByte(idx);
+ }
+
+ @Override
+ public short getShort() {
+ return record.getShort(idx);
+ }
+
+ @Override
+ public int getInt() {
+ return record.getInt(idx);
+ }
+
+ @Override
+ public float getFloat() {
+ return record.getFloat(idx);
+ }
+
+ @Override
+ public long getLong() {
+ return record.getLong(idx);
+ }
+
+ @Override
+ public double getDouble() {
+ return record.getDouble(idx);
+ }
+
+ @Override
+ public BigInteger getBigInteger() {
+ return BigInteger.valueOf(record.getInt(idx));
+ }
+
+ @Override
+ public BigDecimal getDecimal() {
+ return BigDecimal.valueOf(getDouble());
+ }
+
+ @Override
+ public String getString() {
+ return record.getString(idx).toString();
+ }
+
+ @Override
+ public LocalDate getDate() {
+ return Instant.ofEpochMilli(record.getTimestamp(idx, 3)
+ .getMillisecond()).atZone(ZoneOffset.ofHours(8)).toLocalDate();
+ }
+
+ @Override
+ public LocalDateTime getDateTime() {
+ return Instant.ofEpochMilli(record.getTimestamp(idx, 3)
+ .getMillisecond()).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
+ }
+
+ @Override
+ public boolean isNull() {
+ return true;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return record.getBinary(idx);
+ }
+
+ @Override
+ public void unpackArray(List<ColumnValue> values) {
+
+ }
+
+ @Override
+ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+
+ }
+
+ @Override
+ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
+
+ }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index e2da4103d3..5cb5f36482 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -195,7 +195,7 @@ under the License.
<doris.home>${fe.dir}/../</doris.home>
<revision>1.2-SNAPSHOT</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <doris.hive.catalog.shade.version>1.0.3-SNAPSHOT</doris.hive.catalog.shade.version>
+ <doris.hive.catalog.shade.version>1.0.4-SNAPSHOT</doris.hive.catalog.shade.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!--plugin parameters-->
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 9c98cd4d28..d68007e69a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -186,8 +186,8 @@ struct TBrokerScanRangeParams {
1: required i8 column_separator;
2: required i8 line_delimiter;
- // We construct one line in file to a tuple. And each field of line
- // correspond to a slot in this tuple.
+ // We construct one line in file to a tuple. And each field of line
+ // correspond to a slot in this tuple.
// src_tuple_id is the tuple id of the input file
3: required Types.TTupleId src_tuple_id
// src_slot_ids is the slot_ids of the input file
@@ -288,6 +288,19 @@ struct TIcebergFileDesc {
5: optional Exprs.TExpr file_select_conjunct;
}
+struct TPaimonFileDesc {
+ 1: optional binary paimon_split
+ 2: optional string paimon_column_ids
+ 3: optional string paimon_column_types
+ 4: optional string paimon_column_names
+ 5: optional string hive_metastore_uris
+ 6: optional string warehouse
+ 7: optional string db_name
+ 8: optional string table_name
+ 9: optional string length_byte
+}
+
+
struct THudiFileDesc {
1: optional string basePath;
2: optional string dataFilePath;
@@ -300,6 +313,7 @@ struct TTableFormatFileDesc {
1: optional string table_format_type
2: optional TIcebergFileDesc iceberg_params
3: optional THudiFileDesc hudi_params
+ 4: optional TPaimonFileDesc paimon_params
}
struct TFileScanRangeParams {
@@ -566,7 +580,7 @@ struct TSortInfo {
4: optional list<Exprs.TExpr> sort_tuple_slot_exprs
// Indicates the nullable info of sort_tuple_slot_exprs is changed after substitute by child's smap
- 5: optional list<bool> slot_exprs_nullability_changed_flags
+ 5: optional list<bool> slot_exprs_nullability_changed_flags
// Indicates whether topn query using two phase read
6: optional bool use_two_phase_read
}
@@ -606,7 +620,7 @@ struct TEqJoinCondition {
// right-hand side of "<a> = <b>"
2: required Exprs.TExpr right;
// operator of equal join
- 3: optional Opcodes.TExprOpcode opcode;
+ 3: optional Opcodes.TExprOpcode opcode;
}
enum TJoinOp {
@@ -709,7 +723,7 @@ enum TAggregationOp {
DENSE_RANK,
ROW_NUMBER,
LAG,
- HLL_C,
+ HLL_C,
BITMAP_UNION,
NTILE,
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org