You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/06 01:53:05 UTC

[flink-table-store] branch master updated: [FLINK-30255] Throw exception when names are upper case in HiveCatalog

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new cbdb78a3 [FLINK-30255] Throw exception when names are upper case in HiveCatalog
cbdb78a3 is described below

commit cbdb78a37d520e2adc208867957ce8d5134d0a09
Author: shammon <zj...@gmail.com>
AuthorDate: Fri Jan 6 09:52:59 2023 +0800

    [FLINK-30255] Throw exception when names are upper case in HiveCatalog
    
    This closes #413
---
 .../flink/table/store/tests/HiveE2eTest.java       |  2 +-
 .../apache/flink/table/store/hive/HiveCatalog.java | 25 ++++++++
 .../flink/table/store/hive/HiveCatalogITCase.java  | 70 ++++++++++++++--------
 3 files changed, 71 insertions(+), 26 deletions(-)

diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
index 0f5ebace..729b6b2e 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
@@ -86,7 +86,7 @@ public class HiveE2eTest extends E2eReaderTestBase {
     @Test
     public void testFlinkWriteAndHiveRead() throws Exception {
         final String warehouse = HDFS_ROOT + "/" + UUID.randomUUID().toString() + ".warehouse";
-        final String table = "T";
+        final String table = "t";
         runSql(
                 String.join(
                         "\n",
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 8451628a..11982c63 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -58,6 +58,7 @@ import static org.apache.flink.table.store.CatalogOptions.LOCK_ENABLED;
 import static org.apache.flink.table.store.CatalogOptions.TABLE_TYPE;
 import static org.apache.flink.table.store.hive.HiveCatalogLock.acquireTimeout;
 import static org.apache.flink.table.store.hive.HiveCatalogLock.checkMaxSleep;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** A catalog implementation for Hive. */
 public class HiveCatalog extends AbstractCatalog {
@@ -213,6 +214,7 @@ public class HiveCatalog extends AbstractCatalog {
             }
         }
 
+        checkFieldNamesUpperCase(updateSchema.rowType().getFieldNames());
         // first commit changes to underlying files
         // if changes on Hive fails there is no harm to perform the same changes to files again
         TableSchema schema;
@@ -269,6 +271,28 @@ public class HiveCatalog extends AbstractCatalog {
         return hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
     }
 
+    private void checkObjectPathUpperCase(ObjectPath objectPath) {
+        checkState(
+                objectPath.getDatabaseName().equals(objectPath.getDatabaseName().toLowerCase()),
+                String.format(
+                        "Database name[%s] cannot contain upper case",
+                        objectPath.getDatabaseName()));
+        checkState(
+                objectPath.getObjectName().equals(objectPath.getObjectName().toLowerCase()),
+                String.format(
+                        "Table name[%s] cannot contain upper case", objectPath.getObjectName()));
+    }
+
+    private void checkFieldNamesUpperCase(List<String> fieldNames) {
+        List<String> illegalFieldNames =
+                fieldNames.stream()
+                        .filter(f -> !f.equals(f.toLowerCase()))
+                        .collect(Collectors.toList());
+        checkState(
+                illegalFieldNames.isEmpty(),
+                String.format("Field names %s cannot contain upper case", illegalFieldNames));
+    }
+
     private Database convertToDatabase(String name) {
         Database database = new Database();
         database.setName(name);
@@ -370,6 +394,7 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     private SchemaManager schemaManager(ObjectPath tablePath) {
+        checkObjectPathUpperCase(tablePath);
         return new SchemaManager(getTableLocation(tablePath)).withLock(lock(tablePath));
     }
 
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index ba11f599..3ac047b4 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -53,6 +53,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_IN_TEST;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
 /** IT cases for {@link HiveCatalog}. */
 @RunWith(FlinkEmbeddedHiveRunner.class)
@@ -143,10 +144,10 @@ public class HiveCatalogITCase {
         // drop non-empty database
         tEnv.executeSql("CREATE DATABASE test_db2").await();
         tEnv.executeSql("USE test_db2").await();
-        tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+        tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
                 .await();
-        tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await();
-        Path tablePath = new Path(path, "test_db2.db/T");
+        tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 'Hello')").await();
+        Path tablePath = new Path(path, "test_db2.db/t");
         Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
         try {
             tEnv.executeSql("DROP DATABASE test_db2").await();
@@ -164,37 +165,37 @@ public class HiveCatalogITCase {
     @Test
     public void testTableOperations() throws Exception {
         // create table
-        tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+        tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
                 .await();
-        tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+        tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
                 .await();
         Assert.assertEquals(Arrays.asList(Row.of("s"), Row.of("t")), collect("SHOW TABLES"));
         tEnv.executeSql(
-                        "CREATE TABLE IF NOT EXISTS S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+                        "CREATE TABLE IF NOT EXISTS s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
                 .await();
         try {
-            tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+            tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
                     .await();
             Assert.fail("No exception is thrown");
         } catch (Throwable t) {
             ExceptionUtils.assertThrowableWithMessage(
-                    t, "Table (or view) test_db.S already exists in Catalog my_hive");
+                    t, "Table (or view) test_db.s already exists in Catalog my_hive");
         }
 
         // drop table
-        tEnv.executeSql("INSERT INTO S VALUES (1, 'Hi'), (2, 'Hello')").await();
-        Path tablePath = new Path(path, "test_db.db/S");
+        tEnv.executeSql("INSERT INTO s VALUES (1, 'Hi'), (2, 'Hello')").await();
+        Path tablePath = new Path(path, "test_db.db/s");
         Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
-        tEnv.executeSql("DROP TABLE S").await();
+        tEnv.executeSql("DROP TABLE s").await();
         Assert.assertEquals(Collections.singletonList(Row.of("t")), collect("SHOW TABLES"));
         Assert.assertFalse(tablePath.getFileSystem().exists(tablePath));
-        tEnv.executeSql("DROP TABLE IF EXISTS S").await();
+        tEnv.executeSql("DROP TABLE IF EXISTS s").await();
         try {
-            tEnv.executeSql("DROP TABLE S").await();
+            tEnv.executeSql("DROP TABLE s").await();
             Assert.fail("No exception is thrown");
         } catch (Throwable t) {
             ExceptionUtils.assertThrowableWithMessage(
-                    t, "Table with identifier 'my_hive.test_db.S' does not exist");
+                    t, "Table with identifier 'my_hive.test_db.s' does not exist");
         }
         try {
             tEnv.executeSql("DROP TABLE hive_table").await();
@@ -205,8 +206,8 @@ public class HiveCatalogITCase {
         }
 
         // alter table
-        tEnv.executeSql("ALTER TABLE T SET ( 'manifest.target-file-size' = '16MB' )").await();
-        List<Row> actual = collect("SHOW CREATE TABLE T");
+        tEnv.executeSql("ALTER TABLE t SET ( 'manifest.target-file-size' = '16MB' )").await();
+        List<Row> actual = collect("SHOW CREATE TABLE t");
         Assert.assertEquals(1, actual.size());
         Assert.assertTrue(
                 actual.get(0)
@@ -214,11 +215,11 @@ public class HiveCatalogITCase {
                         .toString()
                         .contains("'manifest.target-file-size' = '16MB'"));
         try {
-            tEnv.executeSql("ALTER TABLE S SET ( 'manifest.target-file-size' = '16MB' )").await();
+            tEnv.executeSql("ALTER TABLE s SET ( 'manifest.target-file-size' = '16MB' )").await();
             Assert.fail("No exception is thrown");
         } catch (Throwable t) {
             ExceptionUtils.assertThrowableWithMessage(
-                    t, "Table `my_hive`.`test_db`.`S` doesn't exist or is a temporary table");
+                    t, "Table `my_hive`.`test_db`.`s` doesn't exist or is a temporary table");
         }
         try {
             tEnv.executeSql("ALTER TABLE hive_table SET ( 'manifest.target-file-size' = '16MB' )")
@@ -246,22 +247,22 @@ public class HiveCatalogITCase {
                 .await();
         tEnv.executeSql("USE CATALOG my_hive_external").await();
         tEnv.executeSql("USE test_db").await();
-        tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+        tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
                 .await();
         Assert.assertTrue(
                 hiveShell
                         .executeQuery("DESC FORMATTED t")
                         .contains("Table Type:         \tEXTERNAL_TABLE      \tNULL"));
-        tEnv.executeSql("DROP TABLE T").await();
-        Path tablePath = new Path(path, "test_db.db/T");
+        tEnv.executeSql("DROP TABLE t").await();
+        Path tablePath = new Path(path, "test_db.db/t");
         Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
     }
 
     @Test
     public void testFlinkWriteAndHiveRead() throws Exception {
-        tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+        tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
                 .await();
-        tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await();
+        tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 'Hello')").await();
         Assert.assertEquals(
                 Arrays.asList("1\tHi", "2\tHello"),
                 hiveShell.executeQuery("SELECT * FROM t ORDER BY a"));
@@ -277,7 +278,7 @@ public class HiveCatalogITCase {
 
     @Test
     public void testHiveLock() throws InterruptedException {
-        tEnv.executeSql("CREATE TABLE T (a INT)");
+        tEnv.executeSql("CREATE TABLE t (a INT)");
         CatalogLock.Factory lockFactory =
                 ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get())
                         .catalog()
@@ -300,7 +301,7 @@ public class HiveCatalogITCase {
                                 CatalogLock lock = lockFactory.create();
                                 for (int j = 0; j < 10; j++) {
                                     try {
-                                        lock.runWithLock("test_db", "T", unsafeIncrement);
+                                        lock.runWithLock("test_db", "t", unsafeIncrement);
                                     } catch (Exception e) {
                                         throw new RuntimeException(e);
                                     }
@@ -317,6 +318,25 @@ public class HiveCatalogITCase {
         assertThat(count.get()).isEqualTo(100);
     }
 
+    @Test
+    public void testUpperCase() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                "CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+                                        .await())
+                .hasRootCauseMessage(
+                        String.format("Table name[%s] cannot contain upper case", "T"));
+
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                "CREATE TABLE t (A INT, b STRING, C STRING) WITH ( 'file.format' = 'avro')")
+                                        .await())
+                .hasRootCauseMessage(
+                        String.format("Field names %s cannot contain upper case", "[A, C]"));
+    }
+
     private List<Row> collect(String sql) throws Exception {
         List<Row> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {