You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "morningman (via GitHub)" <gi...@apache.org> on 2023/06/02 09:14:34 UTC

[GitHub] [doris] morningman commented on a diff in pull request #19681: [Feature](multi-catalog)support paimon catalog

morningman commented on code in PR #19681:
URL: https://github.com/apache/doris/pull/19681#discussion_r1214108063


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class PaimonExternalCatalog extends ExternalCatalog {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalCatalog.class);
+    public static final String PAIMON_HMS = "hms";
+    protected String paimonCatalogType;
+    protected Catalog catalog;
+
+    public PaimonExternalCatalog(long catalogId, String name) {
+        super(catalogId, name, InitCatalogLog.Type.PAIMON);
+        this.type = "paimon";
+    }
+
+    @Override
+    protected void init() {
+        super.init();
+    }
+
+    protected Configuration getConfiguration() {
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
+        for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        return conf;
+    }
+
+    public Catalog getCatalog() {
+        makeSureInitialized();
+        return catalog;
+    }
+
+    public String getPaimonCatalogType() {
+        makeSureInitialized();
+        return paimonCatalogType;
+    }
+
+    protected List<String> listDatabaseNames() {
+        return catalog.listDatabases().stream()
+            .map(e -> {
+                String dbName = e.toString();
+                try {
+                    FeNameFormat.checkDbName(dbName);

Review Comment:
   I think we don't need to check it



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class PaimonExternalCatalog extends ExternalCatalog {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalCatalog.class);
+    public static final String PAIMON_HMS = "hms";
+    protected String paimonCatalogType;
+    protected Catalog catalog;
+
+    public PaimonExternalCatalog(long catalogId, String name) {
+        super(catalogId, name, InitCatalogLog.Type.PAIMON);
+        this.type = "paimon";
+    }
+
+    @Override
+    protected void init() {
+        super.init();
+    }
+
+    protected Configuration getConfiguration() {
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
+        for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        return conf;
+    }
+
+    public Catalog getCatalog() {
+        makeSureInitialized();
+        return catalog;
+    }
+
+    public String getPaimonCatalogType() {
+        makeSureInitialized();
+        return paimonCatalogType;
+    }
+
+    protected List<String> listDatabaseNames() {
+        return catalog.listDatabases().stream()
+            .map(e -> {
+                String dbName = e.toString();
+                try {
+                    FeNameFormat.checkDbName(dbName);
+                } catch (AnalysisException ex) {
+                    Util.logAndThrowRuntimeException(LOG,
+                            String.format("Not a supported  name format: %s", dbName), ex);
+                }
+                return dbName;
+            })
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
+        makeSureInitialized();
+        return catalog.tableExists(Identifier.create(dbName, tblName));
+    }
+
+    @Override
+    public List<String> listTableNames(SessionContext ctx, String dbName) {
+        makeSureInitialized();
+        List<String> tableNames = null;
+        try {
+            tableNames = catalog.listTables(dbName);
+        } catch (Catalog.DatabaseNotExistException e) {
+            LOG.warn("DatabaseNotExistException", e);

Review Comment:
   print catalog and db name in log



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+
+
+public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
+
+    public static final String METASTORE = "metastore";
+    public static final String METASTORE_HIVE = "hive";
+    public static final String URI = "uri";
+    private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+            ConfigOptions.key("metastore.client.class")
+            .stringType()
+            .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+            .withDescription(
+                "Class name of Hive metastore client.\n"
+                    + "NOTE: This class must directly implements "
+                    + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+
+    public PaimonHMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
+        super(catalogId, name);
+        props = PropertyConverter.convertToMetaProperties(props);
+        catalogProperty = new CatalogProperty(resource, props);
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        paimonCatalogType = PAIMON_HMS;

Review Comment:
   This should be done in constructor



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class PaimonExternalCatalog extends ExternalCatalog {
+
+    private static final Logger LOG = LogManager.getLogger(PaimonExternalCatalog.class);
+    public static final String PAIMON_HMS = "hms";
+    protected String paimonCatalogType;
+    protected Catalog catalog;
+
+    public PaimonExternalCatalog(long catalogId, String name) {
+        super(catalogId, name, InitCatalogLog.Type.PAIMON);
+        this.type = "paimon";
+    }
+
+    @Override
+    protected void init() {
+        super.init();
+    }
+
+    protected Configuration getConfiguration() {
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
+        for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        return conf;
+    }
+
+    public Catalog getCatalog() {
+        makeSureInitialized();
+        return catalog;
+    }
+
+    public String getPaimonCatalogType() {
+        makeSureInitialized();
+        return paimonCatalogType;
+    }
+
+    protected List<String> listDatabaseNames() {
+        return catalog.listDatabases().stream()
+            .map(e -> {
+                String dbName = e.toString();
+                try {
+                    FeNameFormat.checkDbName(dbName);
+                } catch (AnalysisException ex) {
+                    Util.logAndThrowRuntimeException(LOG,
+                            String.format("Not a supported  name format: %s", dbName), ex);
+                }
+                return dbName;
+            })
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
+        makeSureInitialized();
+        return catalog.tableExists(Identifier.create(dbName, tblName));
+    }
+
+    @Override
+    public List<String> listTableNames(SessionContext ctx, String dbName) {
+        makeSureInitialized();
+        List<String> tableNames = null;
+        try {
+            tableNames = catalog.listTables(dbName);
+        } catch (Catalog.DatabaseNotExistException e) {
+            LOG.warn("DatabaseNotExistException", e);
+        }
+        return tableNames;
+    }
+
+    public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName) {
+        makeSureInitialized();
+        org.apache.paimon.table.Table table = null;
+        try {
+            table = catalog.getTable(Identifier.create(dbName, tblName));
+        } catch (Catalog.TableNotExistException e) {
+            LOG.warn("TableNotExistException", e);

Review Comment:
   ditto



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+
+
+public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
+
+    public static final String METASTORE = "metastore";
+    public static final String METASTORE_HIVE = "hive";
+    public static final String URI = "uri";
+    private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+            ConfigOptions.key("metastore.client.class")
+            .stringType()
+            .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+            .withDescription(
+                "Class name of Hive metastore client.\n"
+                    + "NOTE: This class must directly implements "
+                    + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+
+    public PaimonHMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
+        super(catalogId, name);
+        props = PropertyConverter.convertToMetaProperties(props);
+        catalogProperty = new CatalogProperty(resource, props);
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        paimonCatalogType = PAIMON_HMS;
+        String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
+        String warehouse = catalogProperty.getOrDefault(PaimonProperties.WAREHOUSE, "");
+        Options options = new Options();
+        options.set(PaimonProperties.WAREHOUSE, warehouse);
+        // Currently, only supports hive
+        options.set(METASTORE, METASTORE_HIVE);
+        options.set(URI, metastoreUris);
+        CatalogContext context = CatalogContext.create(options, getConfiguration());
+        try {
+            catalog = create(context);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public Catalog create(CatalogContext context) throws IOException {

Review Comment:
   ```suggestion
       private Catalog create(CatalogContext context) throws IOException {
   ```



##########
fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+    private final String metastoreUris;
+    private final String warehouse;
+    private final String dbName;
+    private final String tblName;
+    private final String[] ids;
+    private final long splitAddress;
+    private final int lengthByte;
+    private PaimonInputSplit paimonInputSplit;
+    private Table table;
+    private RecordReader<InternalRow> reader;
+    private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+    public PaimonJniScanner(int batchSize, Map<String, String> params) {
+        metastoreUris = params.get("hive.metastore.uris");
+        warehouse = params.get("warehouse");
+        splitAddress = Long.parseLong(params.get("split_byte"));
+        lengthByte = Integer.parseInt(params.get("length_byte"));
+        LOG.info("splitAddress:" + splitAddress);
+        LOG.info("lengthByte:" + lengthByte);
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split(",");
+        ids = params.get("columns_id").split(",");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MockJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        getCatalog();
+        // 拿 []byte 反序列化成 split

Review Comment:
   In English



##########
fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+    private final String metastoreUris;
+    private final String warehouse;
+    private final String dbName;
+    private final String tblName;
+    private final String[] ids;
+    private final long splitAddress;
+    private final int lengthByte;
+    private PaimonInputSplit paimonInputSplit;
+    private Table table;
+    private RecordReader<InternalRow> reader;
+    private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+    public PaimonJniScanner(int batchSize, Map<String, String> params) {
+        metastoreUris = params.get("hive.metastore.uris");
+        warehouse = params.get("warehouse");
+        splitAddress = Long.parseLong(params.get("split_byte"));
+        lengthByte = Integer.parseInt(params.get("length_byte"));
+        LOG.info("splitAddress:" + splitAddress);
+        LOG.info("lengthByte:" + lengthByte);
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split(",");
+        ids = params.get("columns_id").split(",");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MockJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        getCatalog();
+        // 拿 []byte 反序列化成 split
+        byte[] splitByte = new byte[lengthByte];
+        OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte);
+        LOG.info("splitBytes:" + Arrays.toString(splitByte));

Review Comment:
   ```suggestion
           LOG.debug("splitBytes: {}", Arrays.toString(splitByte));
   ```
   
   And if `Arrays.toString(splitByte)` is expensive, better if `if LOG.isDebugEnable()`



##########
fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+    private final String metastoreUris;
+    private final String warehouse;
+    private final String dbName;
+    private final String tblName;
+    private final String[] ids;
+    private final long splitAddress;
+    private final int lengthByte;
+    private PaimonInputSplit paimonInputSplit;
+    private Table table;
+    private RecordReader<InternalRow> reader;
+    private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+    public PaimonJniScanner(int batchSize, Map<String, String> params) {
+        metastoreUris = params.get("hive.metastore.uris");
+        warehouse = params.get("warehouse");
+        splitAddress = Long.parseLong(params.get("split_byte"));
+        lengthByte = Integer.parseInt(params.get("length_byte"));
+        LOG.info("splitAddress:" + splitAddress);
+        LOG.info("lengthByte:" + lengthByte);
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split(",");
+        ids = params.get("columns_id").split(",");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MockJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        getCatalog();
+        // 拿 []byte 反序列化成 split
+        byte[] splitByte = new byte[lengthByte];
+        OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte);
+        LOG.info("splitBytes:" + Arrays.toString(splitByte));
+        ByteArrayInputStream bais = new ByteArrayInputStream(splitByte);
+        DataInputStream input = new DataInputStream(bais);
+        LOG.info("input:" + input);

Review Comment:
   ditto



##########
fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+    private final String metastoreUris;
+    private final String warehouse;
+    private final String dbName;
+    private final String tblName;
+    private final String[] ids;
+    private final long splitAddress;
+    private final int lengthByte;
+    private PaimonInputSplit paimonInputSplit;
+    private Table table;
+    private RecordReader<InternalRow> reader;
+    private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+    public PaimonJniScanner(int batchSize, Map<String, String> params) {
+        metastoreUris = params.get("hive.metastore.uris");
+        warehouse = params.get("warehouse");
+        splitAddress = Long.parseLong(params.get("split_byte"));
+        lengthByte = Integer.parseInt(params.get("length_byte"));
+        LOG.info("splitAddress:" + splitAddress);
+        LOG.info("lengthByte:" + lengthByte);
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split(",");
+        ids = params.get("columns_id").split(",");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MockJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        getCatalog();
+        // 拿 []byte 反序列化成 split
+        byte[] splitByte = new byte[lengthByte];
+        OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte);
+        LOG.info("splitBytes:" + Arrays.toString(splitByte));
+        ByteArrayInputStream bais = new ByteArrayInputStream(splitByte);
+        DataInputStream input = new DataInputStream(bais);
+        LOG.info("input:" + input);
+        try {
+            paimonInputSplit.readFields(input);
+        } catch (IOException e) {
+            e.printStackTrace();

Review Comment:
   throw it



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+
+
+public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
+
+    public static final String METASTORE = "metastore";
+    public static final String METASTORE_HIVE = "hive";
+    public static final String URI = "uri";
+    private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
+            ConfigOptions.key("metastore.client.class")
+            .stringType()
+            .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
+            .withDescription(
+                "Class name of Hive metastore client.\n"
+                    + "NOTE: This class must directly implements "
+                    + "org.apache.hadoop.hive.metastore.IMetaStoreClient.");
+
+    public PaimonHMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
+        super(catalogId, name);
+        props = PropertyConverter.convertToMetaProperties(props);
+        catalogProperty = new CatalogProperty(resource, props);
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        paimonCatalogType = PAIMON_HMS;
+        String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
+        String warehouse = catalogProperty.getOrDefault(PaimonProperties.WAREHOUSE, "");
+        Options options = new Options();
+        options.set(PaimonProperties.WAREHOUSE, warehouse);
+        // Currently, only supports hive
+        options.set(METASTORE, METASTORE_HIVE);
+        options.set(URI, metastoreUris);
+        CatalogContext context = CatalogContext.create(options, getConfiguration());
+        try {
+            catalog = create(context);
+        } catch (IOException e) {
+            e.printStackTrace();

Review Comment:
   use `LOG` to print it. And throw exception



##########
fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+    private final String metastoreUris;
+    private final String warehouse;
+    private final String dbName;
+    private final String tblName;
+    private final String[] ids;
+    private final long splitAddress;
+    private final int lengthByte;
+    private PaimonInputSplit paimonInputSplit;
+    private Table table;
+    private RecordReader<InternalRow> reader;
+    private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+    public PaimonJniScanner(int batchSize, Map<String, String> params) {
+        metastoreUris = params.get("hive.metastore.uris");
+        warehouse = params.get("warehouse");
+        splitAddress = Long.parseLong(params.get("split_byte"));
+        lengthByte = Integer.parseInt(params.get("length_byte"));
+        LOG.info("splitAddress:" + splitAddress);
+        LOG.info("lengthByte:" + lengthByte);
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split(",");
+        ids = params.get("columns_id").split(",");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MockJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        getCatalog();
+        // 拿 []byte 反序列化成 split
+        byte[] splitByte = new byte[lengthByte];
+        OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte);
+        LOG.info("splitBytes:" + Arrays.toString(splitByte));
+        ByteArrayInputStream bais = new ByteArrayInputStream(splitByte);
+        DataInputStream input = new DataInputStream(bais);
+        LOG.info("input:" + input);
+        try {
+            paimonInputSplit.readFields(input);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableRead read = readBuilder.newRead();
+        reader = read.createReader(paimonInputSplit.split());
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    protected int getNext() throws IOException {
+        int rows = 0;
+        try {
+            RecordReader.RecordIterator batch;
+            while ((batch = reader.readBatch()) != null) {
+                Object record;
+                while ((record = batch.next()) != null) {
+                    columnValue.setOffsetRow((ColumnarRow) record);
+                    for (int i = 0; i < ids.length; i++) {
+                        columnValue.setIdx(Integer.parseInt(ids[i]));
+                        appendData(i, columnValue);
+                    }
+                    rows++;
+                }
+                batch.releaseBatch();
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return rows;
+    }
+
+    public Catalog create(CatalogContext context) throws IOException {
+        Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE));
+        FileIO fileIO;
+        fileIO = FileIO.get(warehousePath, context);
+        String uri = context.options().get(CatalogOptions.URI);
+        String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR);
+        String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR);
+        HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir);
+
+        // always using user-set parameters overwrite hive-site.xml parameters
+        context.options().toMap().forEach(hiveConf::set);
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+        // set the warehouse location to the hiveConf
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE));
+
+        String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);
+
+        return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap());
+    }
+
+    public void getCatalog() {

Review Comment:
   private



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.paimon;
+
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.PaimonExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3Util;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPaimonFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import avro.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PaimonScanNode extends FileQueryScanNode {
+    private static PaimonSource source = null;
+
+    public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
+        super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        ExternalTable table = (ExternalTable) desc.getTable();
+        if (table.isView()) {
+            throw new AnalysisException(
+                String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName()));
+        }
+        computeColumnFilter();
+        initBackendPolicy();
+        source = new PaimonSource((PaimonExternalTable) table, desc, columnNameToRange);
+        Preconditions.checkNotNull(source);
+        initSchemaParams();
+    }
+
+    public static void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
+        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+        tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
+        TPaimonFileDesc fileDesc = new TPaimonFileDesc();
+        fileDesc.setPaimonSplit(paimonSplit.getSerializableSplit());
+        fileDesc.setLengthByte(Integer.toString(paimonSplit.getSerializableSplit().length));
+        StringBuilder columnNamesBuilder = new StringBuilder();
+        StringBuilder columnTypesBuilder = new StringBuilder();
+        StringBuilder columnIdsBuilder = new StringBuilder();
+        Map<String, Integer> paimonFieldsId = new HashMap<>();
+        Map<String, String> paimonFieldsName = new HashMap<>();
+        for (DataField field : ((AbstractFileStoreTable) source.getPaimonTable()).schema().fields()) {
+            paimonFieldsId.put(field.name(), field.id());
+            paimonFieldsName.put(field.name(), field.type().toString());
+        }
+        boolean isFirst = true;
+        for (SlotDescriptor slot : source.getDesc().getSlots()) {
+            if (!isFirst) {
+                columnNamesBuilder.append(",");

Review Comment:
   Could you add a comment in code, to explain these formats?



##########
fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java:
##########
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.PaimonColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.HiveCatalogOptions;
+import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+
+public class PaimonJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class);
+
+    private final String metastoreUris;
+    private final String warehouse;
+    private final String dbName;
+    private final String tblName;
+    private final String[] ids;
+    private final long splitAddress;
+    private final int lengthByte;
+    private PaimonInputSplit paimonInputSplit;
+    private Table table;
+    private RecordReader<InternalRow> reader;
+    private final PaimonColumnValue columnValue = new PaimonColumnValue();
+
+    public PaimonJniScanner(int batchSize, Map<String, String> params) {
+        metastoreUris = params.get("hive.metastore.uris");
+        warehouse = params.get("warehouse");
+        splitAddress = Long.parseLong(params.get("split_byte"));
+        lengthByte = Integer.parseInt(params.get("length_byte"));
+        LOG.info("splitAddress:" + splitAddress);
+        LOG.info("lengthByte:" + lengthByte);
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split(",");
+        ids = params.get("columns_id").split(",");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MockJniScanner gets pushed-down predicates:  " + ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        getCatalog();
+        // 拿 []byte 反序列化成 split
+        byte[] splitByte = new byte[lengthByte];
+        OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte);
+        LOG.info("splitBytes:" + Arrays.toString(splitByte));
+        ByteArrayInputStream bais = new ByteArrayInputStream(splitByte);
+        DataInputStream input = new DataInputStream(bais);
+        LOG.info("input:" + input);
+        try {
+            paimonInputSplit.readFields(input);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableRead read = readBuilder.newRead();
+        reader = read.createReader(paimonInputSplit.split());
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    protected int getNext() throws IOException {
+        int rows = 0;
+        try {
+            RecordReader.RecordIterator batch;
+            while ((batch = reader.readBatch()) != null) {
+                Object record;
+                while ((record = batch.next()) != null) {
+                    columnValue.setOffsetRow((ColumnarRow) record);
+                    for (int i = 0; i < ids.length; i++) {
+                        columnValue.setIdx(Integer.parseInt(ids[i]));
+                        appendData(i, columnValue);
+                    }
+                    rows++;
+                }
+                batch.releaseBatch();
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return rows;
+    }
+
+    public Catalog create(CatalogContext context) throws IOException {

Review Comment:
   ```suggestion
       private Catalog create(CatalogContext context) throws IOException {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org