You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/07 16:13:07 UTC

[doris] 13/13: [Feature](multi-catalog)support paimon catalog (#19681)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f4f86b07d842a242edd1cf4abe1ede8019c89584
Author: yuxuan-luo <11...@users.noreply.github.com>
AuthorDate: Tue Jun 6 15:08:30 2023 +0800

    [Feature](multi-catalog)support paimon catalog (#19681)
    
    CREATE CATALOG paimon_n2 PROPERTIES (
    "dfs.ha.namenodes.HDFS1006531" = "nn2,nn1",
    "dfs.namenode.rpc-address.HDFS1006531.nn2" = "172.16.65.xx:4007",
    "dfs.namenode.rpc-address.HDFS1006531.nn1" = "172.16.65.xx:4007",
    "hive.metastore.uris" = "thrift://172.16.65.xx:7004",
    "type" = "paimon",
    "dfs.nameservices" = "HDFS1006531",
    "hadoop.username" = "hadoop",
    "paimon.catalog.type" = "hms",
    "warehouse" = "hdfs://HDFS1006531/data/paimon1",
    "dfs.client.failover.proxy.provider.HDFS1006531" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
---
 be/src/vec/CMakeLists.txt                          |   1 +
 be/src/vec/exec/scan/paimon_reader.cpp             |  84 ++++++++++
 be/src/vec/exec/scan/paimon_reader.h               |  76 +++++++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |   8 +
 fe/fe-core/pom.xml                                 |  19 +++
 .../java/org/apache/doris/catalog/TableIf.java     |   4 +-
 .../catalog/external/PaimonExternalDatabase.java   |  72 ++++++++
 .../catalog/external/PaimonExternalTable.java      | 132 +++++++++++++++
 .../apache/doris/datasource/CatalogFactory.java    |   4 +
 .../apache/doris/datasource/ExternalCatalog.java   |   3 +
 .../apache/doris/datasource/InitCatalogLog.java    |   1 +
 .../apache/doris/datasource/InitDatabaseLog.java   |   1 +
 .../datasource/paimon/PaimonExternalCatalog.java   | 103 ++++++++++++
 .../paimon/PaimonHMSExternalCatalog.java           | 105 ++++++++++++
 .../property/constants/PaimonProperties.java}      |  18 +-
 .../glue/translator/PhysicalPlanTranslator.java    |   4 +
 .../org/apache/doris/persist/gson/GsonUtils.java   |   8 +
 .../apache/doris/planner/SingleNodePlanner.java    |   4 +
 .../doris/planner/external/FileQueryScanNode.java  |   4 +
 .../doris/planner/external/TableFormatType.java    |   3 +-
 .../planner/external/paimon/PaimonScanNode.java    | 177 ++++++++++++++++++++
 .../planner/external/paimon/PaimonSource.java      |  64 +++++++
 .../doris/planner/external/paimon/PaimonSplit.java |  65 +++++++
 .../org/apache/doris/statistics/DeriveFactory.java |   1 +
 .../apache/doris/statistics/StatisticalType.java   |   1 +
 fe/java-udf/pom.xml                                |  21 ++-
 .../org/apache/doris/jni/PaimonJniScanner.java     | 186 +++++++++++++++++++++
 .../apache/doris/jni/vec/PaimonColumnValue.java    | 131 +++++++++++++++
 fe/pom.xml                                         |   2 +-
 gensrc/thrift/PlanNodes.thrift                     |  24 ++-
 30 files changed, 1299 insertions(+), 27 deletions(-)

diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 2bf3f245a1..bec6a747b5 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -352,6 +352,7 @@ set(VEC_FILES
   exec/format/parquet/bool_rle_decoder.cpp
   exec/jni_connector.cpp
   exec/scan/jni_reader.cpp
+  exec/scan/paimon_reader.cpp
   exec/scan/max_compute_jni_reader.cpp
   )
 
diff --git a/be/src/vec/exec/scan/paimon_reader.cpp b/be/src/vec/exec/scan/paimon_reader.cpp
new file mode 100644
index 0000000000..906973d838
--- /dev/null
+++ b/be/src/vec/exec/scan/paimon_reader.cpp
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "paimon_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
+                                 RuntimeState* state, RuntimeProfile* profile,
+                                 const TFileRangeDesc& range)
+        : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
+    std::vector<std::string> column_names;
+    for (auto& desc : _file_slot_descs) {
+        std::string field = desc->col_name();
+        column_names.emplace_back(field);
+    }
+    std::map<String, String> params;
+    params["required_fields"] = range.table_format_params.paimon_params.paimon_column_names;
+    params["columns_types"] = range.table_format_params.paimon_params.paimon_column_types;
+    params["columns_id"] = range.table_format_params.paimon_params.paimon_column_ids;
+    params["hive.metastore.uris"] = range.table_format_params.paimon_params.hive_metastore_uris;
+    params["warehouse"] = range.table_format_params.paimon_params.warehouse;
+    params["db_name"] = range.table_format_params.paimon_params.db_name;
+    params["table_name"] = range.table_format_params.paimon_params.table_name;
+    params["length_byte"] = range.table_format_params.paimon_params.length_byte;
+    params["split_byte"] =
+            std::to_string((int64_t)range.table_format_params.paimon_params.paimon_split.data());
+    _jni_connector = std::make_unique<JniConnector>("org/apache/doris/jni/PaimonJniScanner", params,
+                                                    column_names);
+}
+
+Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
+    if (*eof) {
+        RETURN_IF_ERROR(_jni_connector->close());
+    }
+    return Status::OK();
+}
+
+Status PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+                                    std::unordered_set<std::string>* missing_cols) {
+    for (auto& desc : _file_slot_descs) {
+        name_to_type->emplace(desc->col_name(), desc->type());
+    }
+    return Status::OK();
+}
+
+Status PaimonJniReader::init_reader(
+        std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
+    _colname_to_value_range = colname_to_value_range;
+    RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+    return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/paimon_reader.h b/be/src/vec/exec/scan/paimon_reader.h
new file mode 100644
index 0000000000..be90d6d849
--- /dev/null
+++ b/be/src/vec/exec/scan/paimon_reader.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+/**
+ * The demo usage of JniReader, showing how to read data from java scanner.
+ * The java side is also a mock reader that provide values for each type.
+ * This class will only be retained during the functional testing phase to verify that
+ * the communication and data exchange with the jvm are correct.
+ */
+class PaimonJniReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(PaimonJniReader);
+
+public:
+    PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
+                    RuntimeProfile* profile, const TFileRangeDesc& range);
+
+    ~PaimonJniReader() override = default;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_reader(
+            std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
+
+private:
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+    RuntimeState* _state;
+    RuntimeProfile* _profile;
+    std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
+    std::unique_ptr<JniConnector> _jni_connector;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index b7f8119553..a539abc9f0 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -62,6 +62,7 @@
 #include "vec/exec/format/table/iceberg_reader.h"
 #include "vec/exec/scan/max_compute_jni_reader.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/paimon_reader.h"
 #include "vec/exec/scan/vscan_node.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
@@ -600,6 +601,13 @@ Status VFileScanner::_get_next_reader() {
                 init_status = mc_reader->init_reader(_colname_to_value_range);
                 _cur_reader = std::move(mc_reader);
             }
+            if (range.__isset.table_format_params &&
+                range.table_format_params.table_format_type == "paimon") {
+                _cur_reader =
+                        PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range);
+                init_status = ((PaimonJniReader*)(_cur_reader.get()))
+                                      ->init_reader(_colname_to_value_range);
+            }
             break;
         }
         case TFileFormatType::FORMAT_PARQUET: {
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 4bce0193b4..db8e0b7ec1 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -34,6 +34,7 @@ under the License.
         <fe_ut_parallel>1</fe_ut_parallel>
         <antlr4.version>4.9.3</antlr4.version>
         <awssdk.version>2.17.257</awssdk.version>
+        <paimon.version>0.4-SNAPSHOT</paimon.version>
     </properties>
     <profiles>
         <profile>
@@ -529,6 +530,24 @@ under the License.
             <artifactId>iceberg-aws</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-format</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>glue</artifactId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index c79acc79df..95f8873c60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -134,7 +134,8 @@ public interface TableIf {
     enum TableType {
         MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, @Deprecated HUDI, JDBC,
         TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
-        ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE, HUDI_EXTERNAL_TABLE;
+        ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE,
+        HUDI_EXTERNAL_TABLE;
 
         public String toEngineName() {
             switch (this) {
@@ -198,6 +199,7 @@ public interface TableIf {
                 case HMS_EXTERNAL_TABLE:
                 case ES_EXTERNAL_TABLE:
                 case ICEBERG_EXTERNAL_TABLE:
+                case PAIMON_EXTERNAL_TABLE:
                     return "EXTERNAL TABLE";
                 default:
                     return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
new file mode 100644
index 0000000000..ac6d6932c6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable> implements GsonPostProcessable {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalDatabase.class);
+
+    public PaimonExternalDatabase(ExternalCatalog extCatalog, Long id, String name) {
+        super(extCatalog, id, name, InitDatabaseLog.Type.PAIMON);
+    }
+
+    @Override
+    protected PaimonExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
+        return new PaimonExternalTable(tblId, tableName, name, (PaimonExternalCatalog) extCatalog);
+    }
+
+    @Override
+    public List<PaimonExternalTable> getTablesOnIdOrder() {
+        // Sort the name instead, because the id may change.
+        return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
+    }
+
+    @Override
+    public void dropTable(String tableName) {
+        LOG.debug("drop table [{}]", tableName);
+        makeSureInitialized();
+        Long tableId = tableNameToId.remove(tableName);
+        if (tableId == null) {
+            LOG.warn("drop table [{}] failed", tableName);
+        }
+        idToTbl.remove(tableId);
+    }
+
+    @Override
+    public void createTable(String tableName, long tableId) {
+        LOG.debug("create table [{}]", tableName);
+        makeSureInitialized();
+        tableNameToId.put(tableName, tableId);
+        PaimonExternalTable table = new PaimonExternalTable(tableId, tableName, name,
+                (PaimonExternalCatalog) extCatalog);
+        idToTbl.put(tableId, table);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
new file mode 100644
index 0000000000..c821160dd4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.thrift.THiveTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DecimalType;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class PaimonExternalTable extends ExternalTable {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class);
+
+    public static final int PAIMON_DATETIME_SCALE_MS = 3;
+    private Table originTable = null;
+
+    public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) {
+        super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE);
+    }
+
+    public String getPaimonCatalogType() {
+        return ((PaimonExternalCatalog) catalog).getPaimonCatalogType();
+    }
+
+    protected synchronized void makeSureInitialized() {
+        super.makeSureInitialized();
+        if (!objectCreated) {
+            objectCreated = true;
+        }
+    }
+
+    public Table getOriginTable() {
+        if (originTable == null) {
+            originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
+        }
+        return originTable;
+    }
+
+    @Override
+    public List<Column> initSchema() {
+        Table table = getOriginTable();
+        TableSchema schema = ((AbstractFileStoreTable) table).schema();
+        List<DataField> columns = schema.fields();
+        List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
+        for (DataField field : columns) {
+            tmpSchema.add(new Column(field.name(),
+                    paimonTypeToDorisType(field.type()), true, null, true, field.description(), true,
+                    field.id()));
+        }
+        return tmpSchema;
+    }
+
+    private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
+        switch (dataType.getTypeRoot()) {
+            case BOOLEAN:
+                return Type.BOOLEAN;
+            case INTEGER:
+                return Type.INT;
+            case BIGINT:
+                return Type.BIGINT;
+            case FLOAT:
+                return Type.FLOAT;
+            case DOUBLE:
+                return Type.DOUBLE;
+            case VARCHAR:
+            case BINARY:
+            case CHAR:
+                return Type.STRING;
+            case DECIMAL:
+                DecimalType decimal = (DecimalType) dataType;
+                return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale());
+            case DATE:
+                return ScalarType.createDateV2Type();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return ScalarType.createDatetimeV2Type(PAIMON_DATETIME_SCALE_MS);
+            case TIME_WITHOUT_TIME_ZONE:
+                return Type.UNSUPPORTED;
+            default:
+                throw new IllegalArgumentException("Cannot transform unknown type: " + dataType.getTypeRoot());
+        }
+    }
+
+    protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) {
+        return paimonPrimitiveTypeToDorisType(type);
+    }
+
+    @Override
+    public TTableDescriptor toThrift() {
+        List<Column> schema = getFullSchema();
+        if (getPaimonCatalogType().equals("hms")) {
+            THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
+            TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
+                    getName(), dbName);
+            tTableDescriptor.setHiveTable(tHiveTable);
+            return tTableDescriptor;
+        } else {
+            throw new IllegalArgumentException("Currently only supports hms catalog,not support :"
+                + getPaimonCatalogType());
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index f530bcc5f8..358b53a274 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.Resource;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
+import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog;
 import org.apache.doris.datasource.test.TestExternalCatalog;
 
 import com.google.common.base.Strings;
@@ -122,6 +123,9 @@ public class CatalogFactory {
             case "iceberg":
                 catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props, comment);
                 break;
+            case "paimon":
+                catalog = new PaimonHMSExternalCatalog(catalogId, name, resource, props, comment);
+                break;
             case "max_compute":
                 catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props, comment);
                 break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index a23842c9bd..cf2de86494 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.external.HMSExternalDatabase;
 import org.apache.doris.catalog.external.IcebergExternalDatabase;
 import org.apache.doris.catalog.external.JdbcExternalDatabase;
 import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
+import org.apache.doris.catalog.external.PaimonExternalDatabase;
 import org.apache.doris.catalog.external.TestExternalDatabase;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.DdlException;
@@ -448,6 +449,8 @@ public abstract class ExternalCatalog
                 //return new HudiExternalDatabase(this, dbId, dbName);
             case TEST:
                 return new TestExternalDatabase(this, dbId, dbName);
+            case PAIMON:
+                return new PaimonExternalDatabase(this, dbId, dbName);
             default:
                 break;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
index ecc284b325..73fbeeb781 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -37,6 +37,7 @@ public class InitCatalogLog implements Writable {
         ES,
         JDBC,
         ICEBERG,
+        PAIMON,
         MAX_COMPUTE,
         HUDI,
         TEST,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
index a49dd5232c..14cd4410ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
@@ -39,6 +39,7 @@ public class InitDatabaseLog implements Writable {
         JDBC,
         MAX_COMPUTE,
         HUDI,
+        PAIMON,
         TEST,
         UNKNOWN;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
new file mode 100644
index 0000000000..3c024fadfb
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class PaimonExternalCatalog extends ExternalCatalog {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalCatalog.class);
+    public static final String PAIMON_HMS = "hms";
+    protected String paimonCatalogType;
+    protected Catalog catalog;
+
+    public PaimonExternalCatalog(long catalogId, String name, String comment) {
+        super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
+        this.type = "paimon";
+    }
+
+    @Override
+    protected void init() {
+        super.init();
+    }
+
+    protected Configuration getConfiguration() {
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
+        for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        return conf;
+    }
+
+    public Catalog getCatalog() {
+        makeSureInitialized();
+        return catalog;
+    }
+
+    public String getPaimonCatalogType() {
+        makeSureInitialized();
+        return paimonCatalogType;
+    }
+
+    protected List<String> listDatabaseNames() {
+        return new ArrayList<>(catalog.listDatabases());
+    }
+
+    @Override
+    public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
+        makeSureInitialized();
+        return catalog.tableExists(Identifier.create(dbName, tblName));
+    }
+
+    @Override
+    public List<String> listTableNames(SessionContext ctx, String dbName) {
+        makeSureInitialized();
+        List<String> tableNames = null;
+        try {
+            tableNames = catalog.listTables(dbName);
+        } catch (Catalog.DatabaseNotExistException e) {
+            LOG.warn("DatabaseNotExistException", e);
+        }
+        return tableNames;
+    }
+
+    public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName) {
+        makeSureInitialized();
+        org.apache.paimon.table.Table table = null;
+        try {
+            table = catalog.getTable(Identifier.create(dbName, tblName));
+        } catch (Catalog.TableNotExistException e) {
+            LOG.warn("TableNotExistException", e);
+        }
+        return table;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
new file mode 100644
index 0000000000..13775b0edf
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+
+
+public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PaimonHMSExternalCatalog.class);
+    public static final String METASTORE = "metastore";
+    public static final String METASTORE_HIVE = "hive";
+    public static final String URI = "uri";
+    private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+            ConfigOptions.key("metastore.client.class")
+            .stringType()
+            .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+            .withDescription(
+                "Class name of Hive metastore client.\n"
+                    + "NOTE: This class must directly implements "
+                    + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+
+    public PaimonHMSExternalCatalog(long catalogId, String name, String resource,
+                                    Map<String, String> props, String comment) {
+        super(catalogId, name, comment);
+        props = PropertyConverter.convertToMetaProperties(props);
+        catalogProperty = new CatalogProperty(resource, props);
+        paimonCatalogType = PAIMON_HMS;
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
+        String warehouse = catalogProperty.getOrDefault(PaimonProperties.WAREHOUSE, "");
+        Options options = new Options();
+        options.set(PaimonProperties.WAREHOUSE, warehouse);
+        // Currently, only supports hive
+        options.set(METASTORE, METASTORE_HIVE);
+        options.set(URI, metastoreUris);
+        CatalogContext context = CatalogContext.create(options, getConfiguration());
+        try {
+            catalog = create(context);
+        } catch (IOException e) {
+            LOG.warn("failed to create paimon external catalog ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Catalog create(CatalogContext context) throws IOException {
+        Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE));
+        FileIO fileIO;
+        fileIO = FileIO.get(warehousePath, context);
+        String uri = context.options().get(CatalogOptions.URI);
+        String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR);
+        String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR);
+        HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir);
+
+        // always using user-set parameters overwrite hive-site.xml parameters
+        context.options().toMap().forEach(hiveConf::set);
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+        // set the warehouse location to the hiveConf
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE));
+
+        String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
+
+        return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap());
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
similarity index 70%
copy from fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
index 6fc5d69544..e372dd5788 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
@@ -15,20 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.datasource.property.constants;
 
-public enum TableFormatType {
-    HIVE("hive"),
-    ICEBERG("iceberg"),
-    HUDI("hudi");
-
-    private final String tableFormatType;
-
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
-    }
-
-    public String value() {
-        return tableFormatType;
-    }
+public class PaimonProperties {
+    public static final String WAREHOUSE = "warehouse";
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 5d2b3c8f36..03f95dff5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -47,6 +47,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalTable;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
@@ -157,6 +158,7 @@ import org.apache.doris.planner.UnionNode;
 import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.planner.external.HudiScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
 import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TFetchOption;
@@ -717,6 +719,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
             }
         } else if (table instanceof IcebergExternalTable) {
             scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
+        } else if (table instanceof PaimonExternalTable) {
+            scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
         }
         Preconditions.checkNotNull(scanNode);
         fileScan.getConjuncts().stream()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index bf7c0a5484..413a96eac2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -51,6 +51,8 @@ import org.apache.doris.catalog.external.JdbcExternalDatabase;
 import org.apache.doris.catalog.external.JdbcExternalTable;
 import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
 import org.apache.doris.catalog.external.MaxComputeExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalDatabase;
+import org.apache.doris.catalog.external.PaimonExternalTable;
 import org.apache.doris.common.util.RangeUtils;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.EsExternalCatalog;
@@ -63,6 +65,8 @@ import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog;
 import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
 import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
 import org.apache.doris.load.routineload.AbstractDataSourceProperties;
@@ -194,6 +198,8 @@ public class GsonUtils {
             .registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName())
             .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName())
             .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName())
+            .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName())
+            .registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName())
             .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName());
     // routine load data source
     private static RuntimeTypeAdapterFactory<AbstractDataSourceProperties> rdsTypeAdapterFactory =
@@ -208,6 +214,7 @@ public class GsonUtils {
             .registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName())
             .registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName())
             .registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName())
+            .registerSubtype(PaimonExternalDatabase.class, PaimonExternalDatabase.class.getSimpleName())
             .registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName());
 
     private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
@@ -216,6 +223,7 @@ public class GsonUtils {
             .registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName())
             .registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName())
             .registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName())
+            .registerSubtype(PaimonExternalTable.class, PaimonExternalTable.class.getSimpleName())
             .registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName());
 
     // runtime adapter for class "HeartbeatResponse"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0ae4a35edb..97de7d82ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -73,6 +73,7 @@ import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.planner.external.HudiScanNode;
 import org.apache.doris.planner.external.MaxComputeScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.statistics.StatisticalType;
@@ -2019,6 +2020,9 @@ public class SingleNodePlanner {
             case ICEBERG_EXTERNAL_TABLE:
                 scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
                 break;
+            case PAIMON_EXTERNAL_TABLE:
+                scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
+                break;
             case MAX_COMPUTE_EXTERNAL_TABLE:
                 // TODO: support max compute scan node
                 scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "MCScanNode",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 508b174fb5..4c794d083f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -38,6 +38,8 @@ import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergSplit;
+import org.apache.doris.planner.external.paimon.PaimonScanNode;
+import org.apache.doris.planner.external.paimon.PaimonSplit;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -279,6 +281,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
             if (fileSplit instanceof IcebergSplit) {
                 // TODO: extract all data lake split to factory
                 IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit);
+            } else if (fileSplit instanceof PaimonSplit) {
+                PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit) fileSplit);
             }
 
             // if (fileSplit instanceof HudiSplit) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index 6fc5d69544..f97c8ea1ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -20,7 +20,8 @@ package org.apache.doris.planner.external;
 public enum TableFormatType {
     HIVE("hive"),
     ICEBERG("iceberg"),
-    HUDI("hudi");
+    HUDI("hudi"),
+    PAIMON("paimon");
 
     private final String tableFormatType;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
new file mode 100644
index 0000000000..13da29b2bd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3Util;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPaimonFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import avro.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PaimonScanNode extends FileQueryScanNode {
+    private static PaimonSource source = null;
+
+    public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
+        super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        ExternalTable table = (ExternalTable) desc.getTable();
+        if (table.isView()) {
+            throw new AnalysisException(
+                String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName()));
+        }
+        computeColumnFilter();
+        initBackendPolicy();
+        source = new PaimonSource((PaimonExternalTable) table, desc, columnNameToRange);
+        Preconditions.checkNotNull(source);
+        initSchemaParams();
+    }
+
+    public static void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
+        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+        tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
+        TPaimonFileDesc fileDesc = new TPaimonFileDesc();
+        fileDesc.setPaimonSplit(paimonSplit.getSerializableSplit());
+        fileDesc.setLengthByte(Integer.toString(paimonSplit.getSerializableSplit().length));
+        //Paimon columnNames,columnTypes,columnIds that need to be transported into JNI
+        StringBuilder columnNamesBuilder = new StringBuilder();
+        StringBuilder columnTypesBuilder = new StringBuilder();
+        StringBuilder columnIdsBuilder = new StringBuilder();
+        Map<String, Integer> paimonFieldsId = new HashMap<>();
+        Map<String, String> paimonFieldsName = new HashMap<>();
+        for (DataField field : ((AbstractFileStoreTable) source.getPaimonTable()).schema().fields()) {
+            paimonFieldsId.put(field.name(), field.id());
+            paimonFieldsName.put(field.name(), field.type().toString());
+        }
+        boolean isFirst = true;
+        for (SlotDescriptor slot : source.getDesc().getSlots()) {
+            if (!isFirst) {
+                columnNamesBuilder.append(",");
+                columnTypesBuilder.append(",");
+                columnIdsBuilder.append(",");
+            }
+            columnNamesBuilder.append(slot.getColumn().getName());
+            columnTypesBuilder.append(paimonFieldsName.get(slot.getColumn().getName()));
+            columnIdsBuilder.append(paimonFieldsId.get(slot.getColumn().getName()));
+            isFirst = false;
+        }
+        fileDesc.setPaimonColumnIds(columnIdsBuilder.toString());
+        fileDesc.setPaimonColumnNames(columnNamesBuilder.toString());
+        fileDesc.setPaimonColumnTypes(columnTypesBuilder.toString());
+        fileDesc.setHiveMetastoreUris(source.getCatalog().getCatalogProperty().getProperties()
+                .get(HiveConf.ConfVars.METASTOREURIS.varname));
+        fileDesc.setWarehouse(source.getCatalog().getCatalogProperty().getProperties()
+                .get(PaimonProperties.WAREHOUSE));
+        fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName());
+        fileDesc.setTableName(source.getTargetTable().getName());
+        tableFormatFileDesc.setPaimonParams(fileDesc);
+        rangeDesc.setTableFormatParams(tableFormatFileDesc);
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        List<Split> splits = new ArrayList<>();
+        ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
+        List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.newScan().plan().splits();
+        for (org.apache.paimon.table.source.Split split : paimonSplits) {
+            PaimonInputSplit inputSplit = new PaimonInputSplit(
+                        "tempDir",
+                        (DataSplit) split
+            );
+            PaimonSplit paimonSplit = new PaimonSplit(inputSplit,
+                    ((AbstractFileStoreTable) source.getPaimonTable()).location().toString());
+            paimonSplit.setTableFormatType(TableFormatType.PAIMON);
+            splits.add(paimonSplit);
+        }
+        return splits;
+    }
+
+    @Override
+    public TFileType getLocationType() throws DdlException, MetaNotFoundException {
+        String location = ((AbstractFileStoreTable) source.getPaimonTable()).location().toString();
+        if (location != null && !location.isEmpty()) {
+            if (S3Util.isObjStorage(location)) {
+                return TFileType.FILE_S3;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
+                return TFileType.FILE_HDFS;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
+                return TFileType.FILE_LOCAL;
+            }
+        }
+        throw new DdlException("Unknown file location " + location
+            + " for hms table " + source.getPaimonTable().name());
+    }
+
+    @Override
+    public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
+        return TFileFormatType.FORMAT_JNI;
+    }
+
+    @Override
+    public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
+        return new ArrayList<>(source.getPaimonTable().partitionKeys());
+    }
+
+    @Override
+    public TFileAttributes getFileAttributes() throws UserException {
+        return source.getFileAttributes();
+    }
+
+    @Override
+    public TableIf getTargetTable() {
+        return source.getTargetTable();
+    }
+
+    @Override
+    public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
+        return source.getCatalog().getProperties();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
new file mode 100644
index 0000000000..2f55e30c08
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.PaimonExternalTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.thrift.TFileAttributes;
+
+import org.apache.paimon.table.Table;
+
+import java.util.Map;
+
+public class PaimonSource {
+    private final PaimonExternalTable paimonExtTable;
+    private final Table originTable;
+
+    private final TupleDescriptor desc;
+
+    public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
+                            Map<String, ColumnRange> columnNameToRange) {
+        this.paimonExtTable = table;
+        this.originTable = paimonExtTable.getOriginTable();
+        this.desc = desc;
+    }
+
+    public TupleDescriptor getDesc() {
+        return desc;
+    }
+
+    public Table getPaimonTable() {
+        return originTable;
+    }
+
+    public TableIf getTargetTable() {
+        return paimonExtTable;
+    }
+
+    public TFileAttributes getFileAttributes() throws UserException {
+        return new TFileAttributes();
+    }
+
+    public ExternalCatalog getCatalog() {
+        return paimonExtTable.getCatalog();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
new file mode 100644
index 0000000000..e36740f0fd
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.TableFormatType;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class PaimonSplit extends FileSplit {
+    private PaimonInputSplit split;
+    private TableFormatType tableFormatType;
+
+    public PaimonSplit(PaimonInputSplit split, String path) {
+        super(new Path(path), 0, 0, 0, null, null);
+        this.split = split;
+    }
+
+    public PaimonInputSplit getSplit() {
+        return split;
+    }
+
+    public void setSplit(PaimonInputSplit split) {
+        this.split = split;
+    }
+
+    public TableFormatType getTableFormatType() {
+        return tableFormatType;
+    }
+
+    public void setTableFormatType(TableFormatType tableFormatType) {
+        this.tableFormatType = tableFormatType;
+    }
+
+    public byte[] getSerializableSplit() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(baos);
+        try {
+            split.write(output);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return baos.toByteArray();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
index 35391191c7..ba22e067a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
@@ -51,6 +51,7 @@ public class DeriveFactory {
             case ES_SCAN_NODE:
             case HIVE_SCAN_NODE:
             case ICEBERG_SCAN_NODE:
+            case PAIMON_SCAN_NODE:
             case INTERSECT_NODE:
             case SCHEMA_SCAN_NODE:
             case STREAM_LOAD_SCAN_NODE:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 3a4a283c79..67dd9bb054 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -31,6 +31,7 @@ public enum StatisticalType {
     HASH_JOIN_NODE,
     HIVE_SCAN_NODE,
     ICEBERG_SCAN_NODE,
+    PAIMON_SCAN_NODE,
     HUDI_SCAN_NODE,
     TVF_SCAN_NODE,
     INTERSECT_NODE,
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 66d0123795..bcebf3871b 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -37,6 +37,7 @@ under the License.
         <presto.hadoop.version>2.7.4-11</presto.hadoop.version>
         <presto.hive.version>3.0.0-8</presto.hive.version>
         <hudi.version>0.12.2</hudi.version>
+        <paimon.version>0.4-SNAPSHOT</paimon.version>
     </properties>
 
     <dependencies>
@@ -50,6 +51,21 @@ under the License.
             <artifactId>fe-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-bundle</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hive-connector-2.3</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>hive-common</artifactId>
+            <groupId>org.apache.hive</groupId>
+            <version>2.3.9</version>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
@@ -162,10 +178,7 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>hive-catalog-shade</artifactId>
-        </dependency>
+
     </dependencies>
     <build>
         <finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java
new file mode 100644
index 0000000000..03c8b6564e
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+    private final String metastoreUris;
+    private final String warehouse;
+    private final String dbName;
+    private final String tblName;
+    private final String[] ids;
+    private final long splitAddress;
+    private final int lengthByte;
+    private PaimonInputSplit paimonInputSplit;
+    private Table table;
+    private RecordReader<InternalRow> reader;
+    private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+    public PaimonJniScanner(int batchSize, Map<String, String> params) {
+        metastoreUris = params.get("hive.metastore.uris");
+        warehouse = params.get("warehouse");
+        splitAddress = Long.parseLong(params.get("split_byte"));
+        lengthByte = Integer.parseInt(params.get("length_byte"));
+        LOG.info("splitAddress:" + splitAddress);
+        LOG.info("lengthByte:" + lengthByte);
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split(",");
+        ids = params.get("columns_id").split(",");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MockJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        getCatalog();
+        // deserialize it into split
+        byte[] splitByte = new byte[lengthByte];
+        OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte);
+        ByteArrayInputStream bais = new ByteArrayInputStream(splitByte);
+        DataInputStream input = new DataInputStream(bais);
+        try {
+            paimonInputSplit.readFields(input);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableRead read = readBuilder.newRead();
+        reader = read.createReader(paimonInputSplit.split());
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    protected int getNext() throws IOException {
+        int rows = 0;
+        try {
+            RecordReader.RecordIterator batch;
+            while ((batch = reader.readBatch()) != null) {
+                Object record;
+                while ((record = batch.next()) != null) {
+                    columnValue.setOffsetRow((ColumnarRow) record);
+                    for (int i = 0; i < ids.length; i++) {
+                        columnValue.setIdx(Integer.parseInt(ids[i]));
+                        appendData(i, columnValue);
+                    }
+                    rows++;
+                }
+                batch.releaseBatch();
+            }
+        } catch (IOException e) {
+            LOG.warn("failed to getNext columnValue ", e);
+            throw new RuntimeException(e);
+        }
+        return rows;
+    }
+
+    private Catalog create(CatalogContext context) throws IOException {
+        Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE));
+        FileIO fileIO;
+        fileIO = FileIO.get(warehousePath, context);
+        String uri = context.options().get(CatalogOptions.URI);
+        String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR);
+        String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR);
+        HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir);
+
+        // always using user-set parameters overwrite hive-site.xml parameters
+        context.options().toMap().forEach(hiveConf::set);
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+        // set the warehouse location to the hiveConf
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE));
+
+        String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
+
+        return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap());
+    }
+
+    private void getCatalog() {
+        paimonInputSplit = new PaimonInputSplit();
+        Options options = new Options();
+        options.set("warehouse", warehouse);
+        // Currently, only supports hive
+        options.set("metastore", "hive");
+        options.set("uri", metastoreUris);
+        CatalogContext context = CatalogContext.create(options);
+        try {
+            Catalog catalog = create(context);
+            table = catalog.getTable(Identifier.create(dbName, tblName));
+        } catch (IOException | Catalog.TableNotExistException e) {
+            LOG.warn("failed to create paimon external catalog ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+            ConfigOptions.key("metastore.client.class")
+            .stringType()
+            .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+            .withDescription(
+                "Class name of Hive metastore client.\n"
+                    + "NOTE: This class must directly implements "
+                    + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java
new file mode 100644
index 0000000000..3d8bb5e42e
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni.vec;
+
+import org.apache.paimon.data.columnar.ColumnarRow;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+public class PaimonColumnValue implements ColumnValue {
+    private int idx;
+    private ColumnarRow record;
+
+    public PaimonColumnValue() {
+    }
+
+    public void setIdx(int idx) {
+        this.idx = idx;
+    }
+
+    public void setOffsetRow(ColumnarRow record) {
+        this.record = record;
+    }
+
+    @Override
+    public boolean getBoolean() {
+        return record.getBoolean(idx);
+    }
+
+    @Override
+    public byte getByte() {
+        return record.getByte(idx);
+    }
+
+    @Override
+    public short getShort() {
+        return record.getShort(idx);
+    }
+
+    @Override
+    public int getInt() {
+        return record.getInt(idx);
+    }
+
+    @Override
+    public float getFloat() {
+        return record.getFloat(idx);
+    }
+
+    @Override
+    public long getLong() {
+        return record.getLong(idx);
+    }
+
+    @Override
+    public double getDouble() {
+        return record.getDouble(idx);
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        return BigInteger.valueOf(record.getInt(idx));
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        return BigDecimal.valueOf(getDouble());
+    }
+
+    @Override
+    public String getString() {
+        return record.getString(idx).toString();
+    }
+
+    @Override
+    public LocalDate getDate() {
+        return Instant.ofEpochMilli(record.getTimestamp(idx, 3)
+                .getMillisecond()).atZone(ZoneOffset.ofHours(8)).toLocalDate();
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        return Instant.ofEpochMilli(record.getTimestamp(idx, 3)
+            .getMillisecond()).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
+    }
+
+    @Override
+    public boolean isNull() {
+        return true;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return record.getBinary(idx);
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
+
+    }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index e2da4103d3..5cb5f36482 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -195,7 +195,7 @@ under the License.
         <doris.home>${fe.dir}/../</doris.home>
         <revision>1.2-SNAPSHOT</revision>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <doris.hive.catalog.shade.version>1.0.3-SNAPSHOT</doris.hive.catalog.shade.version>
+        <doris.hive.catalog.shade.version>1.0.4-SNAPSHOT</doris.hive.catalog.shade.version>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <!--plugin parameters-->
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 9c98cd4d28..d68007e69a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -186,8 +186,8 @@ struct TBrokerScanRangeParams {
     1: required i8 column_separator;
     2: required i8 line_delimiter;
 
-    // We construct one line in file to a tuple. And each field of line 
-    // correspond to a slot in this tuple. 
+    // We construct one line in file to a tuple. And each field of line
+    // correspond to a slot in this tuple.
     // src_tuple_id is the tuple id of the input file
     3: required Types.TTupleId src_tuple_id
     // src_slot_ids is the slot_ids of the input file
@@ -288,6 +288,19 @@ struct TIcebergFileDesc {
     5: optional Exprs.TExpr file_select_conjunct;
 }
 
+struct TPaimonFileDesc {
+    1: optional binary paimon_split
+    2: optional string paimon_column_ids
+    3: optional string paimon_column_types
+    4: optional string paimon_column_names
+    5: optional string hive_metastore_uris
+    6: optional string warehouse
+    7: optional string db_name
+    8: optional string table_name
+    9: optional string length_byte
+}
+
+
 struct THudiFileDesc {
     1: optional string basePath;
     2: optional string dataFilePath;
@@ -300,6 +313,7 @@ struct TTableFormatFileDesc {
     1: optional string table_format_type
     2: optional TIcebergFileDesc iceberg_params
     3: optional THudiFileDesc hudi_params
+    4: optional TPaimonFileDesc paimon_params
 }
 
 struct TFileScanRangeParams {
@@ -566,7 +580,7 @@ struct TSortInfo {
   4: optional list<Exprs.TExpr> sort_tuple_slot_exprs
 
   // Indicates the nullable info of sort_tuple_slot_exprs is changed after substitute by child's smap
-  5: optional list<bool> slot_exprs_nullability_changed_flags   
+  5: optional list<bool> slot_exprs_nullability_changed_flags
   // Indicates whether topn query using two phase read
   6: optional bool use_two_phase_read
 }
@@ -606,7 +620,7 @@ struct TEqJoinCondition {
   // right-hand side of "<a> = <b>"
   2: required Exprs.TExpr right;
   // operator of equal join
-  3: optional Opcodes.TExprOpcode opcode; 
+  3: optional Opcodes.TExprOpcode opcode;
 }
 
 enum TJoinOp {
@@ -709,7 +723,7 @@ enum TAggregationOp {
   DENSE_RANK,
   ROW_NUMBER,
   LAG,
-  HLL_C, 
+  HLL_C,
   BITMAP_UNION,
   NTILE,
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org