You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/08/04 04:18:45 UTC
[flink] branch master updated: [FLINK-18756][table-api] Support IF
NOT EXISTS for CREATE TABLE statement
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8df10fa [FLINK-18756][table-api] Support IF NOT EXISTS for CREATE TABLE statement
8df10fa is described below
commit 8df10fa589b0897071321aeb3a4db2936e69b778
Author: Leonard Xu <xb...@163.com>
AuthorDate: Tue Aug 4 12:17:04 2020 +0800
[FLINK-18756][table-api] Support IF NOT EXISTS for CREATE TABLE statement
This closes #13037
---
.../client/gateway/local/LocalExecutorITCase.java | 29 ++++++++++++
.../src/main/codegen/includes/parserImpls.ftl | 6 ++-
.../flink/sql/parser/ddl/SqlCreateTable.java | 29 ++----------
.../flink/sql/parser/FlinkSqlParserImplTest.java | 33 ++++++++++++++
.../flink/table/api/TableEnvironmentTest.scala | 52 ++++++++++++++++++++++
5 files changed, 122 insertions(+), 27 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 6895c2d..ecebd5c 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -1023,6 +1023,35 @@ public class LocalExecutorITCase extends TestLogger {
}
@Test
+ public void testCreateTableIfNotExists() throws Exception {
+ final Executor executor = createDefaultExecutor(clusterClient);
+ final SessionContext session = new SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ final String ddlTemplate = "create table if not exists %s(\n" +
+ " a int,\n" +
+ " b bigint,\n" +
+ " c varchar\n" +
+ ") with (\n" +
+ " 'connector.type'='filesystem',\n" +
+ " 'format.type'='csv',\n" +
+ " 'connector.path'='xxx'\n" +
+ ")\n";
+ try {
+ // Test create table twice.
+ executor.executeSql(sessionId, "use catalog catalog1");
+ executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable1"));
+ executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable1"));
+ assertShowResult(executor.executeSql(sessionId, "SHOW TABLES"), Collections.singletonList("MyTable1"));
+
+ executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable2"));
+ executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable2"));
+ assertShowResult(executor.executeSql(sessionId, "SHOW TABLES"), Arrays.asList("MyTable1", "MyTable2"));
+ } finally {
+ executor.closeSession(sessionId);
+ }
+ }
+
+ @Test
public void testCreateTableWithComputedColumn() throws Exception {
// only blink planner support computed column for DDL
Assume.assumeTrue(planner.equals("blink"));
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 105bdc8..ae73717 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -721,6 +721,7 @@ SqlNodeList TableProperties():
SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
{
final SqlParserPos startPos = s.pos();
+ boolean ifNotExists = false;
SqlIdentifier tableName;
List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>();
SqlWatermark watermark = null;
@@ -735,6 +736,8 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
{
<TABLE>
+ ifNotExists = IfNotExistsOpt()
+
tableName = CompoundIdentifier()
[
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
@@ -776,7 +779,8 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
watermark,
comment,
tableLike,
- isTemporary);
+ isTemporary,
+ ifNotExists);
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index a81c275..35931a6 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -85,32 +85,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
@Nullable SqlTableLike tableLike,
- boolean isTemporary
- ) {
- this(
- pos,
- tableName,
- columnList,
- tableConstraints,
- propertyList,
- partitionKeyList,
- watermark,
- comment,
- tableLike,
- isTemporary,
- false);
- }
-
- public SqlCreateTable(
- SqlParserPos pos,
- SqlIdentifier tableName,
- SqlNodeList columnList,
- List<SqlTableConstraint> tableConstraints,
- SqlNodeList propertyList,
- SqlNodeList partitionKeyList,
- @Nullable SqlWatermark watermark,
- @Nullable SqlCharStringLiteral comment,
- @Nullable SqlTableLike tableLike,
boolean isTemporary,
boolean ifNotExists) {
super(OPERATOR, pos, false, ifNotExists);
@@ -283,6 +257,9 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
writer.keyword("TEMPORARY");
}
writer.keyword("TABLE");
+ if (isIfNotExists()) {
+ writer.keyword("IF NOT EXISTS");
+ }
tableName.unparse(writer, leftPrec, rightPrec);
SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
for (SqlNode column : columnList) {
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 0c05dba..76bddcd 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -250,6 +250,39 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
}
@Test
+ public void testCreateTableIfNotExists() {
+ final String sql = "CREATE TABLE IF NOT EXISTS tbl1 (\n" +
+ " a bigint,\n" +
+ " h varchar, \n" +
+ " g as 2 * (a + 1), \n" +
+ " ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+ " b varchar,\n" +
+ " proc as PROCTIME(), \n" +
+ " PRIMARY KEY (a, b)\n" +
+ ")\n" +
+ "PARTITIONED BY (a, h)\n" +
+ " with (\n" +
+ " 'connector' = 'kafka', \n" +
+ " 'kafka.topic' = 'log.test'\n" +
+ ")\n";
+ final String expected = "CREATE TABLE IF NOT EXISTS `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `H` VARCHAR,\n" +
+ " `G` AS (2 * (`A` + 1)),\n" +
+ " `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+ " `B` VARCHAR,\n" +
+ " `PROC` AS `PROCTIME`(),\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ")\n" +
+ "PARTITIONED BY (`A`, `H`)\n" +
+ "WITH (\n" +
+ " 'connector' = 'kafka',\n" +
+ " 'kafka.topic' = 'log.test'\n" +
+ ")";
+ sql(sql).ok(expected);
+ }
+
+ @Test
public void testCreateTableWithComment() {
final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint comment 'test column comment AAA.',\n" +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index f7731f1..5713f75 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -136,6 +136,58 @@ class TableEnvironmentTest {
}
@Test
+ def testExecuteSqlWithCreateDropTableIfNotExists(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE IF NOT EXISTS tbl1 (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ // test crate table twice
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+ val tableResult2 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+ assertTrue(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get()
+ .tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.tbl1")))
+
+ val tableResult3 = tableEnv.executeSql("DROP TABLE IF EXISTS tbl1")
+ assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
+ assertFalse(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get()
+ .tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.tbl1")))
+ }
+
+ @Test
+ def testExecuteSqlWithCreateDropTemporaryTableIfNotExists(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TEMPORARY TABLE IF NOT EXISTS tbl1 (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ // test crate table twice
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+ val tableResult2 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+ assertTrue(tableEnv.listTables().contains("tbl1"))
+
+ val tableResult3 = tableEnv.executeSql("DROP TEMPORARY TABLE IF EXISTS tbl1")
+ assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
+ assertFalse(tableEnv.listTables().contains("tbl1"))
+ }
+
+ @Test
def testExecuteSqlWithCreateDropTemporaryTable(): Unit = {
val createTableStmt =
"""