You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "leaves12138 (via GitHub)" <gi...@apache.org> on 2023/11/22 08:05:07 UTC

[PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

leaves12138 opened a new pull request, #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368

   ### Purpose
   
   Introduce procedure to migrate table from hive to paimon 
   Or
   Add file from hive to paimon.
   
   We should use `FlinkGenericCatalog` to migrate table.
   
   Example:
   ```sql
   --Migrate
   CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic', 'hive-conf-dir' = 'xxx');
   USING CATALOG PAIMON_GE;
   CALL migrate_table('default.hivetable');
   ```
   ```sql
   --Add file
   CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic', 'hive-conf-dir' = 'xxx');
   USING CATALOG PAIMON_GE;
   CALL add_file('hivetable', 'paimontable', false, false);
   ```
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   
   IT case added.
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   I am writing for this. Come up the pull request first.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1405546364


##########
paimon-core/src/main/java/org/apache/paimon/migrate/DataTypeWriter.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.migrate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import java.math.BigDecimal;
+
+/** Generate different converter to write data. */
+public class DataTypeWriter implements DataTypeVisitor<DataConverter> {

Review Comment:
   Can you use `TypeUtils.castFromString` and `BinaryWriter.createValueSetter`?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Migrate procedure to migrate hive table to paimon table. */
+public class MigrateTableProcedure extends GenericProcedureBase {
+
+    private static final String BACK_SUFFIX = "_backup_";
+
+    @Override
+    public String identifier() {
+        return "migrate_table";
+    }
+
+    public String[] call(ProcedureContext procedureContext, String sourceTablePath)
+            throws Exception {
+        return call(procedureContext, sourceTablePath, "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String sourceTablePath, String properties)
+            throws Exception {
+        TableEnvironmentImpl tableEnvironment =
+                TableEnvironmentImpl.create(EnvironmentSettings.inBatchMode());
+        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+
+        CatalogTable sourceFlinkTable =

Review Comment:
   move these logical into paimon-hive, we don't need to rely on Flink Catalog APIs.



##########
docs/content/migration/migration-from-hive.md:
##########
@@ -0,0 +1,80 @@
+---
+title: "Migration From Hive"
+weight: 1
+type: docs
+aliases:
+- /migration/migration-from-hive.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hive Table Migration
+
+Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon. 
+When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you
+still need the original table. The migrated table will be [unware-bucket append-only table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}).
+
+Now, we can use flink generic catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon.

Review Comment:
   We should try to not limit by flink generic catalog, this feature can be engine unrelated.



##########
paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java:
##########
@@ -104,7 +104,7 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
         DataFilePathFactory dataFilePathFactory =
                 pathFactory.createDataFilePathFactory(split.partition(), split.bucket());
         List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new ArrayList<>();
-        if (split.beforeFiles().size() > 0) {

Review Comment:
   Here don't need to use isEmpty, size > 0 is also a good way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1405703538


##########
docs/content/migration/migration-from-hive.md:
##########
@@ -0,0 +1,80 @@
+---
+title: "Migration From Hive"
+weight: 1
+type: docs
+aliases:
+- /migration/migration-from-hive.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hive Table Migration
+
+Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon. 
+When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you
+still need the original table. The migrated table will be [unware-bucket append-only table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}).
+
+Now, we can use flink generic catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon.

Review Comment:
   done



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Migrate procedure to migrate hive table to paimon table. */
+public class MigrateTableProcedure extends GenericProcedureBase {
+
+    private static final String BACK_SUFFIX = "_backup_";
+
+    @Override
+    public String identifier() {
+        return "migrate_table";
+    }
+
+    public String[] call(ProcedureContext procedureContext, String sourceTablePath)
+            throws Exception {
+        return call(procedureContext, sourceTablePath, "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String sourceTablePath, String properties)
+            throws Exception {
+        TableEnvironmentImpl tableEnvironment =
+                TableEnvironmentImpl.create(EnvironmentSettings.inBatchMode());
+        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+
+        CatalogTable sourceFlinkTable =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1406998389


##########
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.migrate;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+
+/** Migrate hive table to paimon table. */
+public class HiveMigrator implements Migrator {
+
+    private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
+            p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith(".");
+
+    private final FileIO fileIO;
+    private final HiveCatalog hiveCatalog;
+    private final IMetaStoreClient client;
+    private final String sourceDatabase;
+    private final String sourceTable;
+    private final String targetDatabase;
+    private final String targetTable;
+    private final Map<String, String> options;
+
+    public HiveMigrator(
+            HiveCatalog hiveCatalog,
+            String sourceDatabase,
+            String sourceTable,
+            String targetDatabase,
+            String targetTable,
+            Map<String, String> options) {
+        this.hiveCatalog = hiveCatalog;
+        this.fileIO = hiveCatalog.fileIO();
+        this.client = hiveCatalog.getHmsClient();
+        this.sourceDatabase = sourceDatabase;
+        this.sourceTable = sourceTable;
+        this.targetDatabase = targetDatabase;
+        this.targetTable = targetTable;
+        this.options = options;
+    }
+
+    public void executeMigrate(boolean sync) throws Exception {
+        if (!client.tableExists(sourceDatabase, sourceTable)) {
+            throw new RuntimeException("Source hive table does not exist");
+        }
+
+        Table sourceHiveTable = client.getTable(sourceDatabase, sourceTable);
+        Map<String, String> properties = new HashMap<>(sourceHiveTable.getParameters());
+        checkPrimaryKey();
+
+        AbstractFileStoreTable paimonTable =
+                createPaimonTableIfNotExists(
+                        client.getSchema(sourceDatabase, sourceTable),
+                        sourceHiveTable.getPartitionKeys(),
+                        properties);
+        checkPaimonTable(paimonTable);
+
+        List<String> partitionsNames =
+                client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE);
+        checkCompatible(sourceHiveTable, paimonTable);
+
+        List<MigrateTask> tasks = new ArrayList<>();
+        if (partitionsNames.isEmpty()) {
+            tasks.add(importUnPartitionedTableTask(fileIO, sourceHiveTable, paimonTable));
+        } else {
+            tasks.addAll(
+                    importPartitionedTableTask(
+                            client, fileIO, partitionsNames, sourceHiveTable, paimonTable));
+        }
+
+        if (sync) {
+            List<CommitMessage> commitMessages = new ArrayList<>();
+            tasks.forEach(task -> commitMessages.add(task.get()));
+            paimonTable.newBatchWriteBuilder().newCommit().commit(commitMessages);
+        } else {
+            Queue<CommitMessage> commitMessages = new LinkedBlockingQueue<>();
+            List<Future<?>> futures = new ArrayList<>();
+            ExecutorService executors =
+                    Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1405730809


##########
paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java:
##########
@@ -104,7 +104,7 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
         DataFilePathFactory dataFilePathFactory =
                 pathFactory.createDataFilePathFactory(split.partition(), split.bucket());
         List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new ArrayList<>();
-        if (split.beforeFiles().size() > 0) {

Review Comment:
   reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1405996773


##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrationTableProcedure.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.StructType;
+
+/** Test. */
+public class MigrationTableProcedure extends BaseProcedure {

Review Comment:
   done



##########
paimon-flink/paimon-flink-common/pom.xml:
##########
@@ -120,6 +121,20 @@ under the License.
             </exclusions>
         </dependency>
 
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1406037809


##########
docs/content/migration/migration-from-hive.md:
##########
@@ -0,0 +1,81 @@
+---
+title: "Migration From Hive"
+weight: 1
+type: docs
+aliases:
+- /migration/migration-from-hive.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hive Table Migration
+
+Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon. 
+When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you
+still need the original table. The migrated table will be [unaware-bucket append-only table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}).
+
+Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon.
+
+* Migrate Table Procedure: Paimon table does not exist, use the procedure upgrade hive table to paimon table. Hive table will disappear after action done.
+* Migrate File Procedure:  Paimon table already exists, use the procedure to migrate files from hive table to paimon table. **Notice that, Hive table will also disappear after action done.**
+
+<span style="color: red; "> **We highly recommend to back up hive table data before migrating, because migrating action is not atomic. If been interrupted while migrating, you may lose your data.** </span>

Review Comment:
   done
   



##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrationTableProcedure.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.StructType;
+
+/** Test. */
+public class MigrationTableProcedure extends BaseProcedure {

Review Comment:
   done



##########
paimon-flink/paimon-flink-common/pom.xml:
##########
@@ -120,6 +121,20 @@ under the License.
             </exclusions>
         </dependency>
 
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#issuecomment-1827667990

   Added Migrate Table Action and its test to project


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1405743821


##########
paimon-core/src/main/java/org/apache/paimon/migrate/DataTypeWriter.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.migrate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import java.math.BigDecimal;
+
+/** Generate different converter to write data. */
+public class DataTypeWriter implements DataTypeVisitor<DataConverter> {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1406999293


##########
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.migrate;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+
+/** Migrate hive table to paimon table. */
+public class HiveMigrator implements Migrator {
+
+    private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
+            p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith(".");
+
+    private final FileIO fileIO;
+    private final HiveCatalog hiveCatalog;
+    private final IMetaStoreClient client;
+    private final String sourceDatabase;
+    private final String sourceTable;
+    private final String targetDatabase;
+    private final String targetTable;
+    private final Map<String, String> options;
+
+    public HiveMigrator(
+            HiveCatalog hiveCatalog,
+            String sourceDatabase,
+            String sourceTable,
+            String targetDatabase,
+            String targetTable,
+            Map<String, String> options) {
+        this.hiveCatalog = hiveCatalog;
+        this.fileIO = hiveCatalog.fileIO();
+        this.client = hiveCatalog.getHmsClient();
+        this.sourceDatabase = sourceDatabase;
+        this.sourceTable = sourceTable;
+        this.targetDatabase = targetDatabase;
+        this.targetTable = targetTable;
+        this.options = options;
+    }
+
+    public void executeMigrate(boolean sync) throws Exception {
+        if (!client.tableExists(sourceDatabase, sourceTable)) {
+            throw new RuntimeException("Source hive table does not exist");
+        }
+
+        Table sourceHiveTable = client.getTable(sourceDatabase, sourceTable);
+        Map<String, String> properties = new HashMap<>(sourceHiveTable.getParameters());
+        checkPrimaryKey();
+
+        AbstractFileStoreTable paimonTable =
+                createPaimonTableIfNotExists(
+                        client.getSchema(sourceDatabase, sourceTable),
+                        sourceHiveTable.getPartitionKeys(),
+                        properties);
+        checkPaimonTable(paimonTable);
+
+        List<String> partitionsNames =
+                client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE);
+        checkCompatible(sourceHiveTable, paimonTable);
+
+        List<MigrateTask> tasks = new ArrayList<>();
+        if (partitionsNames.isEmpty()) {
+            tasks.add(importUnPartitionedTableTask(fileIO, sourceHiveTable, paimonTable));
+        } else {
+            tasks.addAll(
+                    importPartitionedTableTask(
+                            client, fileIO, partitionsNames, sourceHiveTable, paimonTable));
+        }
+
+        if (sync) {

Review Comment:
   Reasonable, we don't need single thread mode. At first, I want to return a Future<MigrateResult>, but seems it is not necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1405910358


##########
paimon-flink/paimon-flink-common/pom.xml:
##########
@@ -120,6 +121,20 @@ under the License.
             </exclusions>
         </dependency>
 
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>

Review Comment:
   remove this



##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrationTableProcedure.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.StructType;
+
+/** Test. */
+public class MigrationTableProcedure extends BaseProcedure {

Review Comment:
   Remove this



##########
docs/content/migration/migration-from-hive.md:
##########
@@ -0,0 +1,81 @@
+---
+title: "Migration From Hive"
+weight: 1
+type: docs
+aliases:
+- /migration/migration-from-hive.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hive Table Migration
+
+Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon. 
+When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you
+still need the original table. The migrated table will be [unaware-bucket append-only table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}).
+
+Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon.
+
+* Migrate Table Procedure: Paimon table does not exist, use the procedure upgrade hive table to paimon table. Hive table will disappear after action done.
+* Migrate File Procedure:  Paimon table already exists, use the procedure to migrate files from hive table to paimon table. **Notice that, Hive table will also disappear after action done.**
+
+<span style="color: red; "> **We highly recommend to back up hive table data before migrating, because migrating action is not atomic. If been interrupted while migrating, you may lose your data.** </span>

Review Comment:
   Document file format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] [hive] Introduce procedure to migrate table from hive to paimon [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2368:
URL: https://github.com/apache/incubator-paimon/pull/2368#discussion_r1406263261


##########
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.migrate;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+
+/** Migrate hive table to paimon table. */
+public class HiveMigrator implements Migrator {
+
+    private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
+            p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith(".");
+
+    private final FileIO fileIO;
+    private final HiveCatalog hiveCatalog;
+    private final IMetaStoreClient client;
+    private final String sourceDatabase;
+    private final String sourceTable;
+    private final String targetDatabase;
+    private final String targetTable;
+    private final Map<String, String> options;
+
+    public HiveMigrator(
+            HiveCatalog hiveCatalog,
+            String sourceDatabase,
+            String sourceTable,
+            String targetDatabase,
+            String targetTable,
+            Map<String, String> options) {
+        this.hiveCatalog = hiveCatalog;
+        this.fileIO = hiveCatalog.fileIO();
+        this.client = hiveCatalog.getHmsClient();
+        this.sourceDatabase = sourceDatabase;
+        this.sourceTable = sourceTable;
+        this.targetDatabase = targetDatabase;
+        this.targetTable = targetTable;
+        this.options = options;
+    }
+
+    public void executeMigrate(boolean sync) throws Exception {
+        if (!client.tableExists(sourceDatabase, sourceTable)) {
+            throw new RuntimeException("Source hive table does not exist");
+        }
+
+        Table sourceHiveTable = client.getTable(sourceDatabase, sourceTable);
+        Map<String, String> properties = new HashMap<>(sourceHiveTable.getParameters());
+        checkPrimaryKey();
+
+        AbstractFileStoreTable paimonTable =
+                createPaimonTableIfNotExists(
+                        client.getSchema(sourceDatabase, sourceTable),
+                        sourceHiveTable.getPartitionKeys(),
+                        properties);
+        checkPaimonTable(paimonTable);
+
+        List<String> partitionsNames =
+                client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE);
+        checkCompatible(sourceHiveTable, paimonTable);
+
+        List<MigrateTask> tasks = new ArrayList<>();
+        if (partitionsNames.isEmpty()) {
+            tasks.add(importUnPartitionedTableTask(fileIO, sourceHiveTable, paimonTable));
+        } else {
+            tasks.addAll(
+                    importPartitionedTableTask(
+                            client, fileIO, partitionsNames, sourceHiveTable, paimonTable));
+        }
+
+        if (sync) {

Review Comment:
   Why we need a sync mode? Actually, single thread mode.



##########
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.migrate;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+
+/** Migrate hive table to paimon table. */
+public class HiveMigrator implements Migrator {
+
+    private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
+            p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith(".");
+
+    private final FileIO fileIO;
+    private final HiveCatalog hiveCatalog;
+    private final IMetaStoreClient client;
+    private final String sourceDatabase;
+    private final String sourceTable;
+    private final String targetDatabase;
+    private final String targetTable;
+    private final Map<String, String> options;
+
+    public HiveMigrator(
+            HiveCatalog hiveCatalog,
+            String sourceDatabase,
+            String sourceTable,
+            String targetDatabase,
+            String targetTable,
+            Map<String, String> options) {
+        this.hiveCatalog = hiveCatalog;
+        this.fileIO = hiveCatalog.fileIO();
+        this.client = hiveCatalog.getHmsClient();
+        this.sourceDatabase = sourceDatabase;
+        this.sourceTable = sourceTable;
+        this.targetDatabase = targetDatabase;
+        this.targetTable = targetTable;
+        this.options = options;
+    }
+
+    public void executeMigrate(boolean sync) throws Exception {
+        if (!client.tableExists(sourceDatabase, sourceTable)) {
+            throw new RuntimeException("Source hive table does not exist");
+        }
+
+        Table sourceHiveTable = client.getTable(sourceDatabase, sourceTable);
+        Map<String, String> properties = new HashMap<>(sourceHiveTable.getParameters());
+        checkPrimaryKey();
+
+        AbstractFileStoreTable paimonTable =
+                createPaimonTableIfNotExists(
+                        client.getSchema(sourceDatabase, sourceTable),
+                        sourceHiveTable.getPartitionKeys(),
+                        properties);
+        checkPaimonTable(paimonTable);
+
+        List<String> partitionsNames =
+                client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE);
+        checkCompatible(sourceHiveTable, paimonTable);
+
+        List<MigrateTask> tasks = new ArrayList<>();
+        if (partitionsNames.isEmpty()) {
+            tasks.add(importUnPartitionedTableTask(fileIO, sourceHiveTable, paimonTable));
+        } else {
+            tasks.addAll(
+                    importPartitionedTableTask(
+                            client, fileIO, partitionsNames, sourceHiveTable, paimonTable));
+        }
+
+        if (sync) {
+            List<CommitMessage> commitMessages = new ArrayList<>();
+            tasks.forEach(task -> commitMessages.add(task.get()));
+            paimonTable.newBatchWriteBuilder().newCommit().commit(commitMessages);
+        } else {
+            Queue<CommitMessage> commitMessages = new LinkedBlockingQueue<>();
+            List<Future<?>> futures = new ArrayList<>();
+            ExecutorService executors =
+                    Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

Review Comment:
   Use `COMMON_IO_FORK_JOIN_POOL`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org