You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/29 13:22:04 UTC

[GitHub] [flink] Tartarus0zm opened a new pull request, #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Tartarus0zm opened a new pull request, #20392:
URL: https://github.com/apache/flink/pull/20392

   ## What is the purpose of the change
   
   * Supports non-atomic CREATE TABLE AS SELECT
   
   
   ## Brief change log
   
   Modify the SqlCreateTableConverter and SqlToOperationConverter to support non-atomic CREATE TABLE AS SELECT
   
   
   ## Verifying this change
   
   add ITCase
   * org.apache.flink.table.planner.runtime.batch.sql.TableSinkITCase#testCreateTableAsSelect
   * org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase#testCreateTableAsSelect
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r934198540


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {

Review Comment:
   Please also test the case when `connector` option is not specified. Moreover, wether can we add a test about managed table?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'true'
+                    |) AS
+                    |  SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResults("MyCtasTable")
+    val expected = List(
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]"
+    )
+    Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)

Review Comment:
   Why not follow other test, using `Assert.assertEquals(expected.sorted, result1.sorted)` directly?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
                 sqlCreateTable.isTemporary());
     }
 
+    /** Convert the {@link SqlCreateTableAs} node. */
+    Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTable) {
+        sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);
+
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
+        ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        PlannerQueryOperation query =
+                (PlannerQueryOperation)
+                        SqlToOperationConverter.convert(
+                                        flinkPlanner, catalogManager, sqlCreateTable.getAsQuery())
+                                .orElseThrow(
+                                        () ->
+                                                new TableException(
+                                                        "CTAS Unsupported node type "

Review Comment:
   ```suggestion
                                                           "CTAS unsupported node type "
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
                 sqlCreateTable.isTemporary());
     }
 
+    /** Convert the {@link SqlCreateTableAs} node. */
+    Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTable) {
+        sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);
+
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
+        ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        PlannerQueryOperation query =
+                (PlannerQueryOperation)
+                        SqlToOperationConverter.convert(
+                                        flinkPlanner, catalogManager, sqlCreateTable.getAsQuery())
+                                .orElseThrow(
+                                        () ->
+                                                new TableException(
+                                                        "CTAS Unsupported node type "
+                                                                + sqlCreateTable
+                                                                        .getAsQuery()
+                                                                        .getClass()
+                                                                        .getSimpleName()));
+        Map<String, String> properties = new HashMap<>();

Review Comment:
   Here we can reuse the `createCatalogTable` method.
   ```
   UnresolvedIdentifier unresolvedIdentifier =
                   UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
   ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
   CatalogTable catalogTable = createCatalogTable(sqlCreateTable);
   
   CreateTableOperation createTableOperation =
                   new CreateTableOperation(
                           identifier,
                        CatalogTable.of(Schema.newBuilder().fromResolvedSchema(query.getResolvedSchema()).build(),
                                   catalogTable.getComment(),
                                   catalogTable.getPartitionKeys(),
                                   catalogTable.getOptions()),
                           sqlCreateTable.isIfNotExists(),
                           sqlCreateTable.isTemporary());
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
                 sqlCreateTable.isTemporary());
     }
 
+    /** Convert the {@link SqlCreateTableAs} node. */
+    Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTable) {
+        sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);

Review Comment:
   This validation is no need, we have validated it in `SqlCreateTableAs#validate` method.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+

Review Comment:
   Please also add plan related tests in `TableSinkTest`. And also consider test the case when `connector` option is not specified. Moreover, wether can we add a test about managed table?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    val dataId = TestValuesTableFactory.registerData(smallData3)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE MyTable (
+                       |  `a` INT,
+                       |  `b` BIGINT,
+                       |  `c` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'bounded' = 'true',
+                       |  'data-id' = '$dataId'
+                       |)
+       """.stripMargin)
+
+    val resultPath = BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath
+    tEnv
+      .executeSql(s"""
+                     |CREATE TABLE MyCtasTable
+                     | WITH (
+                     |  'connector' = 'filesystem',
+                     |  'format' = 'testcsv',
+                     |  'path' = '$resultPath'
+                     |) AS
+                     | SELECT * FROM MyTable
+       """.stripMargin)
+      .await()
+    val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    val result = TableTestUtil.readFromFile(resultPath)
+    Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)

Review Comment:
   Why not follow other test, using `Assert.assertEquals(expected.sorted, result1.sorted)` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong merged pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
wuchong merged PR #20392:
URL: https://github.com/apache/flink/pull/20392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r937274957


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,41 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'true'
+                    |) AS
+                    |  SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResults("MyCtasTable")
+    val expected = List(
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]"
+    )
+    Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)
+  }
+
+  @Test
+  def testCreateTableAsSelectWithoutOptions(): Unit = {
+    // TODO CTAS supports ManagedTable
+    Assertions

Review Comment:
   Please also explain the reason?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on a diff in pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r937356970


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,41 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'true'
+                    |) AS
+                    |  SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResults("MyCtasTable")
+    val expected = List(
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]"
+    )
+    Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)
+  }
+
+  @Test
+  def testCreateTableAsSelectWithoutOptions(): Unit = {
+    // TODO CTAS supports ManagedTable
+    Assertions

Review Comment:
   If the connector option is not specified, Flink will creates a Managed table.
   Managed table relies on checkpoint to commit and the data is visible only after commit.
   Managed table requires two layers of log storage and file storage and depends on the flink table store, CTAS will support Managed Table in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on a diff in pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r934463064


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'true'
+                    |) AS
+                    |  SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResults("MyCtasTable")
+    val expected = List(
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]"
+    )
+    Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)

Review Comment:
   [flink code style](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-mockito---use-reusable-test-implementations)
   Refer to this coding specification
   



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    val dataId = TestValuesTableFactory.registerData(smallData3)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE MyTable (
+                       |  `a` INT,
+                       |  `b` BIGINT,
+                       |  `c` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'bounded' = 'true',
+                       |  'data-id' = '$dataId'
+                       |)
+       """.stripMargin)
+
+    val resultPath = BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath
+    tEnv
+      .executeSql(s"""
+                     |CREATE TABLE MyCtasTable
+                     | WITH (
+                     |  'connector' = 'filesystem',
+                     |  'format' = 'testcsv',
+                     |  'path' = '$resultPath'
+                     |) AS
+                     | SELECT * FROM MyTable
+       """.stripMargin)
+      .await()
+    val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    val result = TableTestUtil.readFromFile(resultPath)
+    Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20392:
URL: https://github.com/apache/flink/pull/20392#issuecomment-1199286421

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "58af2ce4595e98313a1b7c3bdfd50cd26aea1802",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "58af2ce4595e98313a1b7c3bdfd50cd26aea1802",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 58af2ce4595e98313a1b7c3bdfd50cd26aea1802 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on PR #20392:
URL: https://github.com/apache/flink/pull/20392#issuecomment-1200084615

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on a diff in pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r935195557


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+

Review Comment:
   explain does not support CreateTableASOperation yet.
   [FLINK-28770](https://issues.apache.org/jira/browse/FLINK-28770) will follow up to resolve



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on a diff in pull request #20392: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r935239348


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {

Review Comment:
   Add TODO in UT testCreateTableAsSelectWithoutOptions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org