You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/04 14:51:10 UTC

[flink] branch master updated: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT (#20392)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bc29e574105 [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT (#20392)
bc29e574105 is described below

commit bc29e574105a2e635f8f8b9a4ad8f251dfb89321
Author: zhangmang <zh...@163.com>
AuthorDate: Thu Aug 4 22:50:59 2022 +0800

    [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT (#20392)
---
 .../operations/SqlCreateTableConverter.java        | 43 +++++++++++++++
 .../operations/SqlToOperationConverter.java        |  6 +++
 .../planner/plan/stream/sql/TableSinkTest.scala    | 12 +++++
 .../runtime/batch/sql/TableSinkITCase.scala        | 63 +++++++++++++++++++++-
 .../runtime/stream/sql/TableSinkITCase.scala       | 41 ++++++++++++++
 5 files changed, 164 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 0e6b51d1e3e..113d42e2e8a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -19,10 +19,13 @@
 package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
 import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
 import org.apache.flink.sql.parser.ddl.SqlTableLike;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -32,8 +35,10 @@ import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableASOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
@@ -82,6 +87,44 @@ class SqlCreateTableConverter {
                 sqlCreateTable.isTemporary());
     }
 
+    /** Convert the {@link SqlCreateTableAs} node. */
+    Operation convertCreateTableAS(
+            FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTableAs) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateTableAs.fullTableName());
+        ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        PlannerQueryOperation query =
+                (PlannerQueryOperation)
+                        SqlToOperationConverter.convert(
+                                        flinkPlanner, catalogManager, sqlCreateTableAs.getAsQuery())
+                                .orElseThrow(
+                                        () ->
+                                                new TableException(
+                                                        "CTAS unsupported node type "
+                                                                + sqlCreateTableAs
+                                                                        .getAsQuery()
+                                                                        .getClass()
+                                                                        .getSimpleName()));
+        CatalogTable catalogTable = createCatalogTable(sqlCreateTableAs);
+
+        CreateTableOperation createTableOperation =
+                new CreateTableOperation(
+                        identifier,
+                        CatalogTable.of(
+                                Schema.newBuilder()
+                                        .fromResolvedSchema(query.getResolvedSchema())
+                                        .build(),
+                                catalogTable.getComment(),
+                                catalogTable.getPartitionKeys(),
+                                catalogTable.getOptions()),
+                        sqlCreateTableAs.isIfNotExists(),
+                        sqlCreateTableAs.isTemporary());
+
+        return new CreateTableASOperation(
+                createTableOperation, Collections.emptyMap(), query, false);
+    }
+
     private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) {
 
         final TableSchema sourceTableSchema;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 895534124ed..2e137b3ebeb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -41,6 +41,7 @@ import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
 import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
 import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
 import org.apache.flink.sql.parser.ddl.SqlCreateView;
 import org.apache.flink.sql.parser.ddl.SqlDropCatalog;
 import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
@@ -299,6 +300,11 @@ public class SqlToOperationConverter {
         } else if (validated instanceof SqlUseDatabase) {
             return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated));
         } else if (validated instanceof SqlCreateTable) {
+            if (validated instanceof SqlCreateTableAs) {
+                return Optional.of(
+                        converter.createTableConverter.convertCreateTableAS(
+                                flinkPlanner, (SqlCreateTableAs) validated));
+            }
             return Optional.of(
                     converter.createTableConverter.convertCreateTable((SqlCreateTable) validated));
         } else if (validated instanceof SqlDropTable) {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index 69621dc9b42..d22c62fe26a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.RowData
 import org.apache.flink.table.factories.{DynamicTableFactory, DynamicTableSourceFactory}
 import org.apache.flink.table.planner.utils.{TableTestBase, TestingTableEnvironment}
 
+import org.assertj.core.api.Assertions
 import org.junit.Test
 
 import java.util
@@ -797,6 +798,17 @@ class TableSinkTest extends TableTestBase {
     stmtSet.addInsertSql("INSERT INTO zm_test(`a`) SELECT `a` FROM MyTable")
     util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    // TODO: support explain CreateTableASOperation
+    // Flink does not support explain CreateTableASOperation yet, we will fix it in FLINK-28770.
+    Assertions
+      .assertThatThrownBy(
+        () => util.tableEnv.explainSql("CREATE TABLE zm_ctas_test AS SELECT * FROM MyTable"))
+      .hasMessage(
+        "Unsupported operation: org.apache.flink.table.operations.ddl.CreateTableASOperation")
+  }
 }
 
 /** tests table factory use ParallelSourceFunction which support parallelism by env */
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
index be481f7fa82..6ea965191b9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
@@ -20,13 +20,13 @@ package org.apache.flink.table.planner.runtime.batch.sql
 import org.apache.flink.configuration.MemorySize
 import org.apache.flink.core.testutils.FlinkMatchers
 import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory
-import org.apache.flink.table.api.config.TableConfigOptions
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{BatchAbstractTestBase, BatchTestBase}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData.smallData3
 import org.apache.flink.table.planner.utils.TableTestUtil
 
+import org.assertj.core.api.Assertions
 import org.hamcrest.MatcherAssert
 import org.junit.{Assert, Test}
 
@@ -94,4 +94,65 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    val dataId = TestValuesTableFactory.registerData(smallData3)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE MyTable (
+                       |  `a` INT,
+                       |  `b` BIGINT,
+                       |  `c` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'bounded' = 'true',
+                       |  'data-id' = '$dataId'
+                       |)
+       """.stripMargin)
+
+    val resultPath = BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath
+    tEnv
+      .executeSql(s"""
+                     |CREATE TABLE MyCtasTable
+                     | WITH (
+                     |  'connector' = 'filesystem',
+                     |  'format' = 'testcsv',
+                     |  'path' = '$resultPath'
+                     |) AS
+                     | SELECT * FROM MyTable
+       """.stripMargin)
+      .await()
+    val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    val result = TableTestUtil.readFromFile(resultPath)
+    Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)
+  }
+
+  @Test
+  def testCreateTableAsSelectWithoutOptions(): Unit = {
+    // TODO CTAS supports ManagedTable
+    val dataId = TestValuesTableFactory.registerData(smallData3)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE MyTable (
+                       |  `a` INT,
+                       |  `b` BIGINT,
+                       |  `c` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'bounded' = 'true',
+                       |  'data-id' = '$dataId'
+                       |)
+       """.stripMargin)
+
+    Assertions
+      .assertThatThrownBy(
+        () =>
+          tEnv
+            .executeSql("""
+                          |CREATE TABLE MyCtasTable
+                          | AS
+                          | SELECT * FROM MyTable
+                          |""".stripMargin)
+            .await())
+      .hasRootCauseMessage("\nExpecting actual not to be null")
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 08475728995..75b0d43706e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 
+import org.assertj.core.api.Assertions
 import org.junit.Assert.assertEquals
 import org.junit.Test
 import org.junit.runner.RunWith
@@ -192,4 +193,44 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'true'
+                    |) AS
+                    |  SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResults("MyCtasTable")
+    val expected = List(
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]"
+    )
+    Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)
+  }
+
+  @Test
+  def testCreateTableAsSelectWithoutOptions(): Unit = {
+    // TODO: CTAS supports ManagedTable
+    // If the connector option is not specified, Flink will creates a Managed table.
+    // Managed table requires two layers of log storage and file storage
+    // and depends on the flink table store, CTAS will support Managed Table in the future.
+    Assertions
+      .assertThatThrownBy(
+        () => tEnv.executeSql("CREATE TABLE MyCtasTable AS SELECT `person`, `votes` FROM src"))
+      .hasMessage("You should enable the checkpointing for sinking to managed table " +
+        "'default_catalog.default_database.MyCtasTable'," +
+        " managed table relies on checkpoint to commit and the data is visible only after commit.")
+  }
 }