You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/08/04 04:18:45 UTC

[flink] branch master updated: [FLINK-18756][table-api] Support IF NOT EXISTS for CREATE TABLE statement

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8df10fa  [FLINK-18756][table-api] Support IF NOT EXISTS for CREATE TABLE statement
8df10fa is described below

commit 8df10fa589b0897071321aeb3a4db2936e69b778
Author: Leonard Xu <xb...@163.com>
AuthorDate: Tue Aug 4 12:17:04 2020 +0800

    [FLINK-18756][table-api] Support IF NOT EXISTS for CREATE TABLE statement
    
    This closes #13037
---
 .../client/gateway/local/LocalExecutorITCase.java  | 29 ++++++++++++
 .../src/main/codegen/includes/parserImpls.ftl      |  6 ++-
 .../flink/sql/parser/ddl/SqlCreateTable.java       | 29 ++----------
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 33 ++++++++++++++
 .../flink/table/api/TableEnvironmentTest.scala     | 52 ++++++++++++++++++++++
 5 files changed, 122 insertions(+), 27 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 6895c2d..ecebd5c 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -1023,6 +1023,35 @@ public class LocalExecutorITCase extends TestLogger {
 	}
 
 	@Test
+	public void testCreateTableIfNotExists() throws Exception {
+		final Executor executor = createDefaultExecutor(clusterClient);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+		String sessionId = executor.openSession(session);
+		final String ddlTemplate = "create table if not exists %s(\n" +
+			"  a int,\n" +
+			"  b bigint,\n" +
+			"  c varchar\n" +
+			") with (\n" +
+			"  'connector.type'='filesystem',\n" +
+			"  'format.type'='csv',\n" +
+			"  'connector.path'='xxx'\n" +
+			")\n";
+		try {
+			// Test create table twice.
+			executor.executeSql(sessionId, "use catalog catalog1");
+			executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable1"));
+			executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable1"));
+			assertShowResult(executor.executeSql(sessionId, "SHOW TABLES"), Collections.singletonList("MyTable1"));
+
+			executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable2"));
+			executor.executeSql(sessionId, String.format(ddlTemplate, "MyTable2"));
+			assertShowResult(executor.executeSql(sessionId, "SHOW TABLES"), Arrays.asList("MyTable1", "MyTable2"));
+		} finally {
+			executor.closeSession(sessionId);
+		}
+	}
+
+	@Test
 	public void testCreateTableWithComputedColumn() throws Exception {
 		// only blink planner support computed column for DDL
 		Assume.assumeTrue(planner.equals("blink"));
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 105bdc8..ae73717 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -721,6 +721,7 @@ SqlNodeList TableProperties():
 SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
 {
     final SqlParserPos startPos = s.pos();
+    boolean ifNotExists = false;
     SqlIdentifier tableName;
     List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>();
     SqlWatermark watermark = null;
@@ -735,6 +736,8 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
 {
     <TABLE>
 
+    ifNotExists = IfNotExistsOpt()
+
     tableName = CompoundIdentifier()
     [
         <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
@@ -776,7 +779,8 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
                 watermark,
                 comment,
                 tableLike,
-                isTemporary);
+                isTemporary,
+                ifNotExists);
     }
 }
 
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index a81c275..35931a6 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -85,32 +85,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 			@Nullable SqlWatermark watermark,
 			@Nullable SqlCharStringLiteral comment,
 			@Nullable SqlTableLike tableLike,
-			boolean isTemporary
-	) {
-		this(
-			pos,
-			tableName,
-			columnList,
-			tableConstraints,
-			propertyList,
-			partitionKeyList,
-			watermark,
-			comment,
-			tableLike,
-			isTemporary,
-			false);
-	}
-
-	public SqlCreateTable(
-			SqlParserPos pos,
-			SqlIdentifier tableName,
-			SqlNodeList columnList,
-			List<SqlTableConstraint> tableConstraints,
-			SqlNodeList propertyList,
-			SqlNodeList partitionKeyList,
-			@Nullable SqlWatermark watermark,
-			@Nullable SqlCharStringLiteral comment,
-			@Nullable SqlTableLike tableLike,
 			boolean isTemporary,
 			boolean ifNotExists) {
 		super(OPERATOR, pos, false, ifNotExists);
@@ -283,6 +257,9 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 			writer.keyword("TEMPORARY");
 		}
 		writer.keyword("TABLE");
+		if (isIfNotExists()) {
+			writer.keyword("IF NOT EXISTS");
+		}
 		tableName.unparse(writer, leftPrec, rightPrec);
 		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
 		for (SqlNode column : columnList) {
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 0c05dba..76bddcd 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -250,6 +250,39 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 	}
 
 	@Test
+	public void testCreateTableIfNotExists() {
+		final String sql = "CREATE TABLE IF NOT EXISTS tbl1 (\n" +
+			"  a bigint,\n" +
+			"  h varchar, \n" +
+			"  g as 2 * (a + 1), \n" +
+			"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+			"  b varchar,\n" +
+			"  proc as PROCTIME(), \n" +
+			"  PRIMARY KEY (a, b)\n" +
+			")\n" +
+			"PARTITIONED BY (a, h)\n" +
+			"  with (\n" +
+			"    'connector' = 'kafka', \n" +
+			"    'kafka.topic' = 'log.test'\n" +
+			")\n";
+		final String expected = "CREATE TABLE IF NOT EXISTS `TBL1` (\n" +
+			"  `A`  BIGINT,\n" +
+			"  `H`  VARCHAR,\n" +
+			"  `G` AS (2 * (`A` + 1)),\n" +
+			"  `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+			"  `B`  VARCHAR,\n" +
+			"  `PROC` AS `PROCTIME`(),\n" +
+			"  PRIMARY KEY (`A`, `B`)\n" +
+			")\n" +
+			"PARTITIONED BY (`A`, `H`)\n" +
+			"WITH (\n" +
+			"  'connector' = 'kafka',\n" +
+			"  'kafka.topic' = 'log.test'\n" +
+			")";
+		sql(sql).ok(expected);
+	}
+
+	@Test
 	public void testCreateTableWithComment() {
 		final String sql = "CREATE TABLE tbl1 (\n" +
 				"  a bigint comment 'test column comment AAA.',\n" +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index f7731f1..5713f75 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -136,6 +136,58 @@ class TableEnvironmentTest {
   }
 
   @Test
+  def testExecuteSqlWithCreateDropTableIfNotExists(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE IF NOT EXISTS tbl1 (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    // test crate table twice
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    val tableResult2 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertTrue(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get()
+      .tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.tbl1")))
+
+    val tableResult3 = tableEnv.executeSql("DROP TABLE IF EXISTS tbl1")
+    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
+    assertFalse(tableEnv.getCatalog(tableEnv.getCurrentCatalog).get()
+      .tableExists(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.tbl1")))
+  }
+
+  @Test
+  def testExecuteSqlWithCreateDropTemporaryTableIfNotExists(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TEMPORARY TABLE IF NOT EXISTS tbl1 (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    // test crate table twice
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+    val tableResult2 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+    assertTrue(tableEnv.listTables().contains("tbl1"))
+
+    val tableResult3 = tableEnv.executeSql("DROP TEMPORARY TABLE IF EXISTS tbl1")
+    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
+    assertFalse(tableEnv.listTables().contains("tbl1"))
+  }
+
+  @Test
   def testExecuteSqlWithCreateDropTemporaryTable(): Unit = {
     val createTableStmt =
       """