You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/06 01:53:05 UTC
[flink-table-store] branch master updated: [FLINK-30255] Throw exception when names are upper case in HiveCatalog
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new cbdb78a3 [FLINK-30255] Throw exception when names are upper case in HiveCatalog
cbdb78a3 is described below
commit cbdb78a37d520e2adc208867957ce8d5134d0a09
Author: shammon <zj...@gmail.com>
AuthorDate: Fri Jan 6 09:52:59 2023 +0800
[FLINK-30255] Throw exception when names are upper case in HiveCatalog
This closes #413
---
.../flink/table/store/tests/HiveE2eTest.java | 2 +-
.../apache/flink/table/store/hive/HiveCatalog.java | 25 ++++++++
.../flink/table/store/hive/HiveCatalogITCase.java | 70 ++++++++++++++--------
3 files changed, 71 insertions(+), 26 deletions(-)
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
index 0f5ebace..729b6b2e 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
@@ -86,7 +86,7 @@ public class HiveE2eTest extends E2eReaderTestBase {
@Test
public void testFlinkWriteAndHiveRead() throws Exception {
final String warehouse = HDFS_ROOT + "/" + UUID.randomUUID().toString() + ".warehouse";
- final String table = "T";
+ final String table = "t";
runSql(
String.join(
"\n",
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 8451628a..11982c63 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -58,6 +58,7 @@ import static org.apache.flink.table.store.CatalogOptions.LOCK_ENABLED;
import static org.apache.flink.table.store.CatalogOptions.TABLE_TYPE;
import static org.apache.flink.table.store.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.flink.table.store.hive.HiveCatalogLock.checkMaxSleep;
+import static org.apache.flink.util.Preconditions.checkState;
/** A catalog implementation for Hive. */
public class HiveCatalog extends AbstractCatalog {
@@ -213,6 +214,7 @@ public class HiveCatalog extends AbstractCatalog {
}
}
+ checkFieldNamesUpperCase(updateSchema.rowType().getFieldNames());
// first commit changes to underlying files
// if changes on Hive fails there is no harm to perform the same changes to files again
TableSchema schema;
@@ -269,6 +271,28 @@ public class HiveCatalog extends AbstractCatalog {
return hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
}
+ private void checkObjectPathUpperCase(ObjectPath objectPath) {
+ checkState(
+ objectPath.getDatabaseName().equals(objectPath.getDatabaseName().toLowerCase()),
+ String.format(
+ "Database name[%s] cannot contain upper case",
+ objectPath.getDatabaseName()));
+ checkState(
+ objectPath.getObjectName().equals(objectPath.getObjectName().toLowerCase()),
+ String.format(
+ "Table name[%s] cannot contain upper case", objectPath.getObjectName()));
+ }
+
+ private void checkFieldNamesUpperCase(List<String> fieldNames) {
+ List<String> illegalFieldNames =
+ fieldNames.stream()
+ .filter(f -> !f.equals(f.toLowerCase()))
+ .collect(Collectors.toList());
+ checkState(
+ illegalFieldNames.isEmpty(),
+ String.format("Field names %s cannot contain upper case", illegalFieldNames));
+ }
+
private Database convertToDatabase(String name) {
Database database = new Database();
database.setName(name);
@@ -370,6 +394,7 @@ public class HiveCatalog extends AbstractCatalog {
}
private SchemaManager schemaManager(ObjectPath tablePath) {
+ checkObjectPathUpperCase(tablePath);
return new SchemaManager(getTableLocation(tablePath)).withLock(lock(tablePath));
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index ba11f599..3ac047b4 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -53,6 +53,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_IN_TEST;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/** IT cases for {@link HiveCatalog}. */
@RunWith(FlinkEmbeddedHiveRunner.class)
@@ -143,10 +144,10 @@ public class HiveCatalogITCase {
// drop non-empty database
tEnv.executeSql("CREATE DATABASE test_db2").await();
tEnv.executeSql("USE test_db2").await();
- tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
- tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await();
- Path tablePath = new Path(path, "test_db2.db/T");
+ tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 'Hello')").await();
+ Path tablePath = new Path(path, "test_db2.db/t");
Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
try {
tEnv.executeSql("DROP DATABASE test_db2").await();
@@ -164,37 +165,37 @@ public class HiveCatalogITCase {
@Test
public void testTableOperations() throws Exception {
// create table
- tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
- tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
Assert.assertEquals(Arrays.asList(Row.of("s"), Row.of("t")), collect("SHOW TABLES"));
tEnv.executeSql(
- "CREATE TABLE IF NOT EXISTS S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ "CREATE TABLE IF NOT EXISTS s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
try {
- tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
Assert.fail("No exception is thrown");
} catch (Throwable t) {
ExceptionUtils.assertThrowableWithMessage(
- t, "Table (or view) test_db.S already exists in Catalog my_hive");
+ t, "Table (or view) test_db.s already exists in Catalog my_hive");
}
// drop table
- tEnv.executeSql("INSERT INTO S VALUES (1, 'Hi'), (2, 'Hello')").await();
- Path tablePath = new Path(path, "test_db.db/S");
+ tEnv.executeSql("INSERT INTO s VALUES (1, 'Hi'), (2, 'Hello')").await();
+ Path tablePath = new Path(path, "test_db.db/s");
Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
- tEnv.executeSql("DROP TABLE S").await();
+ tEnv.executeSql("DROP TABLE s").await();
Assert.assertEquals(Collections.singletonList(Row.of("t")), collect("SHOW TABLES"));
Assert.assertFalse(tablePath.getFileSystem().exists(tablePath));
- tEnv.executeSql("DROP TABLE IF EXISTS S").await();
+ tEnv.executeSql("DROP TABLE IF EXISTS s").await();
try {
- tEnv.executeSql("DROP TABLE S").await();
+ tEnv.executeSql("DROP TABLE s").await();
Assert.fail("No exception is thrown");
} catch (Throwable t) {
ExceptionUtils.assertThrowableWithMessage(
- t, "Table with identifier 'my_hive.test_db.S' does not exist");
+ t, "Table with identifier 'my_hive.test_db.s' does not exist");
}
try {
tEnv.executeSql("DROP TABLE hive_table").await();
@@ -205,8 +206,8 @@ public class HiveCatalogITCase {
}
// alter table
- tEnv.executeSql("ALTER TABLE T SET ( 'manifest.target-file-size' = '16MB' )").await();
- List<Row> actual = collect("SHOW CREATE TABLE T");
+ tEnv.executeSql("ALTER TABLE t SET ( 'manifest.target-file-size' = '16MB' )").await();
+ List<Row> actual = collect("SHOW CREATE TABLE t");
Assert.assertEquals(1, actual.size());
Assert.assertTrue(
actual.get(0)
@@ -214,11 +215,11 @@ public class HiveCatalogITCase {
.toString()
.contains("'manifest.target-file-size' = '16MB'"));
try {
- tEnv.executeSql("ALTER TABLE S SET ( 'manifest.target-file-size' = '16MB' )").await();
+ tEnv.executeSql("ALTER TABLE s SET ( 'manifest.target-file-size' = '16MB' )").await();
Assert.fail("No exception is thrown");
} catch (Throwable t) {
ExceptionUtils.assertThrowableWithMessage(
- t, "Table `my_hive`.`test_db`.`S` doesn't exist or is a temporary table");
+ t, "Table `my_hive`.`test_db`.`s` doesn't exist or is a temporary table");
}
try {
tEnv.executeSql("ALTER TABLE hive_table SET ( 'manifest.target-file-size' = '16MB' )")
@@ -246,22 +247,22 @@ public class HiveCatalogITCase {
.await();
tEnv.executeSql("USE CATALOG my_hive_external").await();
tEnv.executeSql("USE test_db").await();
- tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
Assert.assertTrue(
hiveShell
.executeQuery("DESC FORMATTED t")
.contains("Table Type: \tEXTERNAL_TABLE \tNULL"));
- tEnv.executeSql("DROP TABLE T").await();
- Path tablePath = new Path(path, "test_db.db/T");
+ tEnv.executeSql("DROP TABLE t").await();
+ Path tablePath = new Path(path, "test_db.db/t");
Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
}
@Test
public void testFlinkWriteAndHiveRead() throws Exception {
- tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
- tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await();
+ tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 'Hello')").await();
Assert.assertEquals(
Arrays.asList("1\tHi", "2\tHello"),
hiveShell.executeQuery("SELECT * FROM t ORDER BY a"));
@@ -277,7 +278,7 @@ public class HiveCatalogITCase {
@Test
public void testHiveLock() throws InterruptedException {
- tEnv.executeSql("CREATE TABLE T (a INT)");
+ tEnv.executeSql("CREATE TABLE t (a INT)");
CatalogLock.Factory lockFactory =
((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get())
.catalog()
@@ -300,7 +301,7 @@ public class HiveCatalogITCase {
CatalogLock lock = lockFactory.create();
for (int j = 0; j < 10; j++) {
try {
- lock.runWithLock("test_db", "T", unsafeIncrement);
+ lock.runWithLock("test_db", "t", unsafeIncrement);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -317,6 +318,25 @@ public class HiveCatalogITCase {
assertThat(count.get()).isEqualTo(100);
}
+ @Test
+ public void testUpperCase() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ .await())
+ .hasRootCauseMessage(
+ String.format("Table name[%s] cannot contain upper case", "T"));
+
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "CREATE TABLE t (A INT, b STRING, C STRING) WITH ( 'file.format' = 'avro')")
+ .await())
+ .hasRootCauseMessage(
+ String.format("Field names %s cannot contain upper case", "[A, C]"));
+ }
+
private List<Row> collect(String sql) throws Exception {
List<Row> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {