You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/04 14:51:10 UTC
[flink] branch master updated: [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT (#20392)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bc29e574105 [FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT (#20392)
bc29e574105 is described below
commit bc29e574105a2e635f8f8b9a4ad8f251dfb89321
Author: zhangmang <zh...@163.com>
AuthorDate: Thu Aug 4 22:50:59 2022 +0800
[FLINK-28459][table-planner] Supports non-atomic CREATE TABLE AS SELECT (#20392)
---
.../operations/SqlCreateTableConverter.java | 43 +++++++++++++++
.../operations/SqlToOperationConverter.java | 6 +++
.../planner/plan/stream/sql/TableSinkTest.scala | 12 +++++
.../runtime/batch/sql/TableSinkITCase.scala | 63 +++++++++++++++++++++-
.../runtime/stream/sql/TableSinkITCase.scala | 41 ++++++++++++++
5 files changed, 164 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 0e6b51d1e3e..113d42e2e8a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -19,10 +19,13 @@
package org.apache.flink.table.planner.operations;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
import org.apache.flink.sql.parser.ddl.SqlTableLike;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogManager;
@@ -32,8 +35,10 @@ import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
@@ -82,6 +87,44 @@ class SqlCreateTableConverter {
sqlCreateTable.isTemporary());
}
+ /** Convert the {@link SqlCreateTableAs} node. */
+ Operation convertCreateTableAS(
+ FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTableAs) {
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(sqlCreateTableAs.fullTableName());
+ ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+ PlannerQueryOperation query =
+ (PlannerQueryOperation)
+ SqlToOperationConverter.convert(
+ flinkPlanner, catalogManager, sqlCreateTableAs.getAsQuery())
+ .orElseThrow(
+ () ->
+ new TableException(
+ "CTAS unsupported node type "
+ + sqlCreateTableAs
+ .getAsQuery()
+ .getClass()
+ .getSimpleName()));
+ CatalogTable catalogTable = createCatalogTable(sqlCreateTableAs);
+
+ CreateTableOperation createTableOperation =
+ new CreateTableOperation(
+ identifier,
+ CatalogTable.of(
+ Schema.newBuilder()
+ .fromResolvedSchema(query.getResolvedSchema())
+ .build(),
+ catalogTable.getComment(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getOptions()),
+ sqlCreateTableAs.isIfNotExists(),
+ sqlCreateTableAs.isTemporary());
+
+ return new CreateTableASOperation(
+ createTableOperation, Collections.emptyMap(), query, false);
+ }
+
private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) {
final TableSchema sourceTableSchema;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 895534124ed..2e137b3ebeb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -41,6 +41,7 @@ import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
import org.apache.flink.sql.parser.ddl.SqlCreateView;
import org.apache.flink.sql.parser.ddl.SqlDropCatalog;
import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
@@ -299,6 +300,11 @@ public class SqlToOperationConverter {
} else if (validated instanceof SqlUseDatabase) {
return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated));
} else if (validated instanceof SqlCreateTable) {
+ if (validated instanceof SqlCreateTableAs) {
+ return Optional.of(
+ converter.createTableConverter.convertCreateTableAS(
+ flinkPlanner, (SqlCreateTableAs) validated));
+ }
return Optional.of(
converter.createTableConverter.convertCreateTable((SqlCreateTable) validated));
} else if (validated instanceof SqlDropTable) {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index 69621dc9b42..d22c62fe26a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.RowData
import org.apache.flink.table.factories.{DynamicTableFactory, DynamicTableSourceFactory}
import org.apache.flink.table.planner.utils.{TableTestBase, TestingTableEnvironment}
+import org.assertj.core.api.Assertions
import org.junit.Test
import java.util
@@ -797,6 +798,17 @@ class TableSinkTest extends TableTestBase {
stmtSet.addInsertSql("INSERT INTO zm_test(`a`) SELECT `a` FROM MyTable")
util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
}
+
+ @Test
+ def testCreateTableAsSelect(): Unit = {
+ // TODO: support explain CreateTableASOperation
+ // Flink does not support explain CreateTableASOperation yet, we will fix it in FLINK-28770.
+ Assertions
+ .assertThatThrownBy(
+ () => util.tableEnv.explainSql("CREATE TABLE zm_ctas_test AS SELECT * FROM MyTable"))
+ .hasMessage(
+ "Unsupported operation: org.apache.flink.table.operations.ddl.CreateTableASOperation")
+ }
}
/** tests table factory use ParallelSourceFunction which support parallelism by env */
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
index be481f7fa82..6ea965191b9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
@@ -20,13 +20,13 @@ package org.apache.flink.table.planner.runtime.batch.sql
import org.apache.flink.configuration.MemorySize
import org.apache.flink.core.testutils.FlinkMatchers
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory
-import org.apache.flink.table.api.config.TableConfigOptions
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.{BatchAbstractTestBase, BatchTestBase}
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TestData.smallData3
import org.apache.flink.table.planner.utils.TableTestUtil
+import org.assertj.core.api.Assertions
import org.hamcrest.MatcherAssert
import org.junit.{Assert, Test}
@@ -94,4 +94,65 @@ class TableSinkITCase extends BatchTestBase {
tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb"))
checkResult("SELECT 1", Seq(row(1)))
}
+
+ @Test
+ def testCreateTableAsSelect(): Unit = {
+ val dataId = TestValuesTableFactory.registerData(smallData3)
+ tEnv.executeSql(s"""
+ |CREATE TABLE MyTable (
+ | `a` INT,
+ | `b` BIGINT,
+ | `c` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'true',
+ | 'data-id' = '$dataId'
+ |)
+ """.stripMargin)
+
+ val resultPath = BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath
+ tEnv
+ .executeSql(s"""
+ |CREATE TABLE MyCtasTable
+ | WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '$resultPath'
+ |) AS
+ | SELECT * FROM MyTable
+ """.stripMargin)
+ .await()
+ val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+ val result = TableTestUtil.readFromFile(resultPath)
+ Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)
+ }
+
+ @Test
+ def testCreateTableAsSelectWithoutOptions(): Unit = {
+ // TODO CTAS supports ManagedTable
+ val dataId = TestValuesTableFactory.registerData(smallData3)
+ tEnv.executeSql(s"""
+ |CREATE TABLE MyTable (
+ | `a` INT,
+ | `b` BIGINT,
+ | `c` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'true',
+ | 'data-id' = '$dataId'
+ |)
+ """.stripMargin)
+
+ Assertions
+ .assertThatThrownBy(
+ () =>
+ tEnv
+ .executeSql("""
+ |CREATE TABLE MyCtasTable
+ | AS
+ | SELECT * FROM MyTable
+ |""".stripMargin)
+ .await())
+ .hasRootCauseMessage("\nExpecting actual not to be null")
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 08475728995..75b0d43706e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.assertj.core.api.Assertions
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
@@ -192,4 +193,44 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
)
assertEquals(expected.sorted, result.sorted)
}
+
+ @Test
+ def testCreateTableAsSelect(): Unit = {
+ tEnv
+ .executeSql("""
+ |CREATE TABLE MyCtasTable
+ | WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |) AS
+ | SELECT
+ | `person`,
+ | `votes`
+ | FROM
+ | src
+ |""".stripMargin)
+ .await()
+ val actual = TestValuesTableFactory.getResults("MyCtasTable")
+ val expected = List(
+ "+I[jason, 1]",
+ "+I[jason, 1]",
+ "+I[jason, 1]",
+ "+I[jason, 1]"
+ )
+ Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)
+ }
+
+ @Test
+ def testCreateTableAsSelectWithoutOptions(): Unit = {
+ // TODO: CTAS supports ManagedTable
+ // If the connector option is not specified, Flink will creates a Managed table.
+ // Managed table requires two layers of log storage and file storage
+ // and depends on the flink table store, CTAS will support Managed Table in the future.
+ Assertions
+ .assertThatThrownBy(
+ () => tEnv.executeSql("CREATE TABLE MyCtasTable AS SELECT `person`, `votes` FROM src"))
+ .hasMessage("You should enable the checkpointing for sinking to managed table " +
+ "'default_catalog.default_database.MyCtasTable'," +
+ " managed table relies on checkpoint to commit and the data is visible only after commit.")
+ }
}