You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2021/05/17 02:39:19 UTC

[flink] branch release-1.13 updated: [FLINK-22654][sql-parser] Fix SqlCreateTable#toString()/unparse() lose CONSTRAINTS and watermarks

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 6f4a49c  [FLINK-22654][sql-parser] Fix SqlCreateTable#toString()/unparse() lose CONSTRAINTS and watermarks
6f4a49c is described below

commit 6f4a49ce7c79a8f820c22bd797c6db13f5d0d177
Author: Terry Wang <zj...@foxmail.com>
AuthorDate: Mon May 17 10:31:29 2021 +0800

    [FLINK-22654][sql-parser] Fix SqlCreateTable#toString()/unparse() lose CONSTRAINTS and watermarks
    
    This closes #15918
    
    (cherry picked from commit cb4b4d37870fba22af122d1c31e49ce7f3ed096c)
---
 .../flink/sql/parser/ddl/SqlCreateTable.java       |  2 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 48 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 3b184e8..7549d1b 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -266,7 +266,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
             writer.keyword("IF NOT EXISTS");
         }
         tableName.unparse(writer, leftPrec, rightPrec);
-        if (columnList.size() > 0) {
+        if (columnList.size() > 0 || tableConstraints.size() > 0 || watermark != null) {
             SqlWriter.Frame frame =
                     writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
             for (SqlNode column : columnList) {
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index a222321..d9f523b 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -811,6 +811,35 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
     }
 
     @Test
+    public void testCreateTableWithLikeClauseWithoutColumns() {
+        final String sql =
+                ""
+                        + "create TEMPORARY table source_table (\n"
+                        + "   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n"
+                        + ") with (\n"
+                        + "  'scan.startup.mode' = 'specific-offsets',\n"
+                        + "  'scan.startup.specific-offsets' = 'partition:0,offset:1169129'\n"
+                        + ") like t_order_course (\n"
+                        + "   OVERWRITING  WATERMARKS\n"
+                        + "   OVERWRITING OPTIONS\n"
+                        + "   EXCLUDING CONSTRAINTS\n"
+                        + ")";
+        final String expected =
+                "CREATE TEMPORARY TABLE `SOURCE_TABLE` (\n"
+                        + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL '5' SECOND)\n"
+                        + ") WITH (\n"
+                        + "  'scan.startup.mode' = 'specific-offsets',\n"
+                        + "  'scan.startup.specific-offsets' = 'partition:0,offset:1169129'\n"
+                        + ")\n"
+                        + "LIKE `T_ORDER_COURSE` (\n"
+                        + "  OVERWRITING WATERMARKS\n"
+                        + "  OVERWRITING OPTIONS\n"
+                        + "  EXCLUDING CONSTRAINTS\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
     public void testCreateTemporaryTable() {
         final String sql =
                 "create temporary table source_table(\n"
@@ -846,6 +875,25 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
     }
 
     @Test
+    public void testCreateTableWithOnlyWaterMark() {
+        final String sql =
+                "create table source_table (\n"
+                        + "  watermark FOR ts AS ts - interval '3' second\n"
+                        + ") with (\n"
+                        + "  'x' = 'y',\n"
+                        + "  'abc' = 'def'\n"
+                        + ")";
+        final String expected =
+                "CREATE TABLE `SOURCE_TABLE` (\n"
+                        + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' SECOND)\n"
+                        + ") WITH (\n"
+                        + "  'x' = 'y',\n"
+                        + "  'abc' = 'def'\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
     public void testDropTable() {
         final String sql = "DROP table catalog1.db1.tbl1";
         final String expected = "DROP TABLE `CATALOG1`.`DB1`.`TBL1`";