You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/18 04:43:13 UTC

[incubator-paimon] 32/32: [core] Rename paimon: hive meta should be compatible

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 340bf0010e380ee32125fe4a4d135a4c4cdb6609
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sat Mar 18 11:06:45 2023 +0800

    [core] Rename paimon: hive meta should be compatible
---
 .../apache/paimon/tests/FileStoreBatchE2eTest.java | 12 ++---
 .../tests/FileStoreBuiltInFormatE2eTest.java       |  6 +--
 .../java/org/apache/paimon/tests/HiveE2eTest.java  | 11 ++---
 .../apache/flink/table/hive/LegacyHiveClasses.java | 36 +++++++++++++++
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 51 ++++++++++++----------
 .../store/hive/TableStoreHiveStorageHandler.java   | 25 +++++++++++
 .../flink/table/store/hive/TableStoreSerDe.java    | 25 +++++++++++
 .../table/store/mapred/TableStoreInputFormat.java  | 25 +++++++++++
 .../table/store/mapred/TableStoreOutputFormat.java | 25 +++++++++++
 9 files changed, 176 insertions(+), 40 deletions(-)

diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBatchE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBatchE2eTest.java
index aed5b0686..5bb1ff085 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBatchE2eTest.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBatchE2eTest.java
@@ -78,7 +78,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase {
 
         String useCatalogCmd = "USE CATALOG ts_catalog;";
 
-        String tableStoreDdl =
+        String paimonDdl =
                 "CREATE TABLE IF NOT EXISTS ts_table (\n"
                         + "    dt VARCHAR,\n"
                         + "    hr VARCHAR,\n"
@@ -98,14 +98,14 @@ public class FileStoreBatchE2eTest extends E2eTestBase {
                 catalogDdl,
                 useCatalogCmd,
                 testDataSourceDdl,
-                tableStoreDdl);
+                paimonDdl);
 
         // test #1: read all data from paimon
         runSql(
                 "INSERT INTO result1 SELECT * FROM ts_table;",
                 catalogDdl,
                 useCatalogCmd,
-                tableStoreDdl,
+                paimonDdl,
                 createResultSink(
                         "result1",
                         "dt VARCHAR, hr VARCHAR, person VARCHAR, category VARCHAR, price INT"));
@@ -133,7 +133,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase {
                 "INSERT INTO result2 SELECT * FROM ts_table WHERE dt > '20211110' AND hr < '09';",
                 catalogDdl,
                 useCatalogCmd,
-                tableStoreDdl,
+                paimonDdl,
                 createResultSink(
                         "result2",
                         "dt VARCHAR, hr VARCHAR, person VARCHAR, category VARCHAR, price INT"));
@@ -149,7 +149,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase {
                 "INSERT INTO result3 SELECT * FROM ts_table WHERE person = 'Alice' AND category = 'Food';",
                 catalogDdl,
                 useCatalogCmd,
-                tableStoreDdl,
+                paimonDdl,
                 createResultSink(
                         "result3",
                         "dt VARCHAR, hr VARCHAR, person VARCHAR, category VARCHAR, price INT"));
@@ -166,7 +166,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase {
                         + "INSERT INTO result4 SELECT dt, category, sum(price) AS total FROM ts_table GROUP BY dt, category;",
                 catalogDdl,
                 useCatalogCmd,
-                tableStoreDdl,
+                paimonDdl,
                 createResultSink("result4", "dt VARCHAR, hr VARCHAR, total INT"));
         checkResult(
                 "20211110, Drink, 200",
diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBuiltInFormatE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBuiltInFormatE2eTest.java
index 3f171737f..9425c3042 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBuiltInFormatE2eTest.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreBuiltInFormatE2eTest.java
@@ -51,7 +51,7 @@ public class FileStoreBuiltInFormatE2eTest extends E2eTestBase {
                         TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store");
 
         String useCatalogCmd = "USE CATALOG ts_catalog;";
-        String tableStoreDdl =
+        String paimonDdl =
                 "CREATE TABLE IF NOT EXISTS ts_table (\n"
                         + schema
                         + ") WITH (\n"
@@ -92,13 +92,13 @@ public class FileStoreBuiltInFormatE2eTest extends E2eTestBase {
                         + "DATE '2022-05-23'"
                         + ")";
 
-        runSql(insertDml, catalogDdl, useCatalogCmd, tableStoreDdl);
+        runSql(insertDml, catalogDdl, useCatalogCmd, paimonDdl);
 
         runSql(
                 "INSERT INTO result1 SELECT * FROM ts_table where id > 1;",
                 catalogDdl,
                 useCatalogCmd,
-                tableStoreDdl,
+                paimonDdl,
                 createResultSink("result1", schema));
         checkResult(
                 "2, "
diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/HiveE2eTest.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/HiveE2eTest.java
index 8155306ca..20b139d26 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/HiveE2eTest.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/HiveE2eTest.java
@@ -57,8 +57,8 @@ public class HiveE2eTest extends E2eReaderTestBase {
     @Test
     public void testReadExternalTable() throws Exception {
         final String table = "table_store_pk";
-        String tableStorePkPath = HDFS_ROOT + "/" + UUID.randomUUID() + ".store";
-        String tableStorePkDdl =
+        String paimonPkPath = HDFS_ROOT + "/" + UUID.randomUUID() + ".store";
+        String paimonPkDdl =
                 String.format(
                         "CREATE TABLE IF NOT EXISTS %s (\n"
                                 + "  a int,\n"
@@ -69,17 +69,14 @@ public class HiveE2eTest extends E2eReaderTestBase {
                                 + "  'bucket' = '2'\n"
                                 + ");",
                         table);
-        runSql(
-                createInsertSql(table),
-                createCatalogSql("table_store", tableStorePkPath),
-                tableStorePkDdl);
+        runSql(createInsertSql(table), createCatalogSql("table_store", paimonPkPath), paimonPkDdl);
 
         String externalTablePkDdl =
                 String.format(
                         "CREATE EXTERNAL TABLE IF NOT EXISTS %s\n"
                                 + "STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'\n"
                                 + "LOCATION '%s/default.db/%s';\n",
-                        table, tableStorePkPath, table);
+                        table, paimonPkPath, table);
 
         checkQueryResults(table, this::executeQuery, externalTablePkDdl);
     }
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/flink/table/hive/LegacyHiveClasses.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/flink/table/hive/LegacyHiveClasses.java
new file mode 100644
index 000000000..0316f3f1d
--- /dev/null
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/flink/table/hive/LegacyHiveClasses.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.hive;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+
+/** Legacy hive classes of table store 0.3. */
+@Deprecated
+public class LegacyHiveClasses {
+
+    private static final String LEGACY_INPUT_FORMAT_CLASS_NAME =
+            "org.apache.flink.table.store.mapred.TableStoreInputFormat";
+    private static final String LEGACY_OUTPUT_FORMAT_CLASS_NAME =
+            "org.apache.flink.table.store.mapred.TableStoreOutputFormat";
+
+    public static boolean isPaimonTable(Table table) {
+        return LEGACY_INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
+                && LEGACY_OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat());
+    }
+}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 286d9908d..c7888860e 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.hive;
 
+import org.apache.flink.table.hive.LegacyHiveClasses;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -166,7 +168,7 @@ public class HiveCatalog extends AbstractCatalog {
                                 // tables.
                                 // so we just check the schema file first
                                 return schemaFileExists(identifier)
-                                        && tableStoreTableExists(identifier, false);
+                                        && paimonTableExists(identifier, false);
                             })
                     .collect(Collectors.toList());
         } catch (UnknownDBException e) {
@@ -178,7 +180,7 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
-        if (!tableStoreTableExists(identifier)) {
+        if (!paimonTableExists(identifier)) {
             throw new TableNotExistException(identifier);
         }
         Path tableLocation = getDataTableLocation(identifier);
@@ -191,7 +193,7 @@ public class HiveCatalog extends AbstractCatalog {
     public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
             throws TableNotExistException {
         checkNotSystemTable(identifier, "dropTable");
-        if (!tableStoreTableExists(identifier)) {
+        if (!paimonTableExists(identifier)) {
             if (ignoreIfNotExists) {
                 return;
             } else {
@@ -215,7 +217,7 @@ public class HiveCatalog extends AbstractCatalog {
         if (!databaseExists(databaseName)) {
             throw new DatabaseNotExistException(databaseName);
         }
-        if (tableStoreTableExists(identifier)) {
+        if (paimonTableExists(identifier)) {
             if (ignoreIfExists) {
                 return;
             } else {
@@ -256,7 +258,7 @@ public class HiveCatalog extends AbstractCatalog {
             throws TableNotExistException, TableAlreadyExistException {
         checkNotSystemTable(fromTable, "renameTable");
         checkNotSystemTable(toTable, "renameTable");
-        if (!tableStoreTableExists(fromTable)) {
+        if (!paimonTableExists(fromTable)) {
             if (ignoreIfNotExists) {
                 return;
             } else {
@@ -264,7 +266,7 @@ public class HiveCatalog extends AbstractCatalog {
             }
         }
 
-        if (tableStoreTableExists(toTable)) {
+        if (paimonTableExists(toTable)) {
             throw new TableAlreadyExistException(toTable);
         }
 
@@ -285,7 +287,7 @@ public class HiveCatalog extends AbstractCatalog {
             Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
             throws TableNotExistException {
         checkNotSystemTable(identifier, "alterTable");
-        if (!tableStoreTableExists(identifier)) {
+        if (!paimonTableExists(identifier)) {
             if (ignoreIfNotExists) {
                 return;
             } else {
@@ -414,15 +416,15 @@ public class HiveCatalog extends AbstractCatalog {
                 dataField.description());
     }
 
-    private boolean tableStoreTableExists(Identifier identifier) {
-        return tableStoreTableExists(identifier, true);
+    private boolean paimonTableExists(Identifier identifier) {
+        return paimonTableExists(identifier, true);
     }
 
     private boolean schemaFileExists(Identifier identifier) {
         return new SchemaManager(fileIO, getDataTableLocation(identifier)).latest().isPresent();
     }
 
-    private boolean tableStoreTableExists(Identifier identifier, boolean throwException) {
+    private boolean paimonTableExists(Identifier identifier, boolean throwException) {
         Table table;
         try {
             table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
@@ -434,21 +436,22 @@ public class HiveCatalog extends AbstractCatalog {
                     e);
         }
 
-        if (!INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
-                || !OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat())) {
-            if (throwException) {
-                throw new IllegalArgumentException(
-                        "Table "
-                                + identifier.getFullName()
-                                + " is not a paimon table. It's input format is "
-                                + table.getSd().getInputFormat()
-                                + " and its output format is "
-                                + table.getSd().getOutputFormat());
-            } else {
-                return false;
-            }
+        boolean isPaimonTable = isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table);
+        if (!isPaimonTable && throwException) {
+            throw new IllegalArgumentException(
+                    "Table "
+                            + identifier.getFullName()
+                            + " is not a paimon table. It's input format is "
+                            + table.getSd().getInputFormat()
+                            + " and its output format is "
+                            + table.getSd().getOutputFormat());
         }
-        return true;
+        return isPaimonTable;
+    }
+
+    private static boolean isPaimonTable(Table table) {
+        return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
+                && OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat());
     }
 
     private SchemaManager schemaManager(Identifier identifier) {
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
new file mode 100644
index 000000000..b1336d536
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.paimon.hive.PaimonStorageHandler;
+
+/** A {@link PaimonStorageHandler} to be compatible to table store 0.3. */
+@Deprecated
+public class TableStoreHiveStorageHandler extends PaimonStorageHandler {}
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
new file mode 100644
index 000000000..c2fe9b98e
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.paimon.hive.PaimonSerDe;
+
+/** A {@link PaimonSerDe} to be compatible to table store 0.3. */
+@Deprecated
+public class TableStoreSerDe extends PaimonSerDe {}
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
new file mode 100644
index 000000000..28d83ff84
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.mapred;
+
+import org.apache.paimon.hive.mapred.PaimonInputFormat;
+
+/** A {@link PaimonInputFormat} to be compatible to table store 0.3. */
+@Deprecated
+public class TableStoreInputFormat extends PaimonInputFormat {}
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java
new file mode 100644
index 000000000..052196841
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.mapred;
+
+import org.apache.paimon.hive.mapred.PaimonOutputFormat;
+
+/** A {@link PaimonOutputFormat} to be compatible to table store 0.3. */
+@Deprecated
+public class TableStoreOutputFormat extends PaimonOutputFormat {}