You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/08 12:57:29 UTC
[flink] 02/05: [FLINK-17267] [table-planner] legacy batch planner
supports explain insert operation
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2fc82ad5b92367c9f9a2aad5e6df3fd78108754e
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Apr 24 20:45:12 2020 +0800
[FLINK-17267] [table-planner] legacy batch planner supports explain insert operation
---
.../table/tests/test_table_environment_api.py | 5 +-
.../table/api/internal/BatchTableEnvImpl.scala | 191 +++++++++++++++++----
.../flink/table/api/internal/TableEnvImpl.scala | 63 ++++++-
.../table/plan/nodes/dataset/DataSetSink.scala | 57 ++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 3 +-
.../table/plan/rules/dataSet/DataSetSinkRule.scala | 55 ++++++
.../apache/flink/table/api/batch/ExplainTest.scala | 71 ++++++--
.../sql/validation/InsertIntoValidationTest.scala | 8 +
.../validation/InsertIntoValidationTest.scala | 4 +
.../src/test/scala/resources/testInsert1.out | 27 +++
.../test/scala/resources/testMultipleInserts1.out | 51 ++++++
11 files changed, 477 insertions(+), 58 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index c1bf04a..87c8023 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -400,8 +400,9 @@ class BatchTableEnvironmentTests(TableEnvironmentTest, PyFlinkBatchTableTestCase
t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
- with self.assertRaises(TableException):
- t_env.explain(extended=True)
+ actual = t_env.explain(extended=True)
+
+ assert isinstance(actual, str)
def test_create_table_environment(self):
table_config = TableConfig()
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 343fc23..e25b8e4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -38,8 +38,9 @@ import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{DataSetQueryOperation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation}
import org.apache.flink.table.plan.BatchOptimizer
+import org.apache.flink.table.plan.nodes.LogicalSink
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.planner.Conversions
import org.apache.flink.table.runtime.MapRunner
@@ -74,7 +75,7 @@ abstract class BatchTableEnvImpl(
moduleManager: ModuleManager)
extends TableEnvImpl(config, catalogManager, moduleManager) {
- private val bufferedSinks = new JArrayList[DataSink[_]]
+ private val bufferedModifyOperations = new JArrayList[ModifyOperation]()
private[flink] val optimizer = new BatchOptimizer(
() => config.getPlannerConfig.unwrap(classOf[CalciteConfig]).orElse(CalciteConfig.DEFAULT),
@@ -170,8 +171,8 @@ abstract class BatchTableEnvImpl(
}
}
- override protected def addToBuffer(sink: DataSink[_]): Unit = {
- bufferedSinks.add(sink)
+ override protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit = {
+ bufferedModifyOperations.add(modifyOperation)
}
/**
@@ -215,32 +216,69 @@ abstract class BatchTableEnvImpl(
* @param extended Flag to include detailed optimizer estimates.
*/
private[flink] def explain(table: Table, extended: Boolean): String = {
- val ast = getRelBuilder.tableOperation(table.getQueryOperation).build()
- val optimizedPlan = optimizer.optimize(ast)
- val dataSet = translate[Row](
- optimizedPlan,
- getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan))(
- new GenericTypeInfo(classOf[Row]))
- dataSet.output(new DiscardingOutputFormat[Row])
- val env = dataSet.getExecutionEnvironment
+ explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended)
+ }
+
+ override def explain(table: Table): String = explain(table: Table, extended = false)
+
+ override def explain(extended: Boolean): String = {
+ explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
+ }
+
+ private def explain(operations: JList[Operation], extended: Boolean): String = {
+ require(operations.asScala.nonEmpty, "operations should not be empty")
+ val astList = operations.asScala.map {
+ case queryOperation: QueryOperation =>
+ getRelBuilder.tableOperation(queryOperation).build()
+ case modifyOperation: ModifyOperation =>
+ translateToRel(modifyOperation, addLogicalSink = true)
+ case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}")
+ }
+
+ val optimizedNodes = astList.map(optimizer.optimize)
+
+ val batchTableEnv = createDummyBatchTableEnv()
+ val dataSinks = optimizedNodes.zip(operations.asScala).map {
+ case (optimizedNode, operation) =>
+ operation match {
+ case queryOperation: QueryOperation =>
+ val dataSet = translate[Row](
+ optimizedNode,
+ getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))(
+ new GenericTypeInfo(classOf[Row]))
+ dataSet.output(new DiscardingOutputFormat[Row])
+ case modifyOperation: ModifyOperation =>
+ val tableSink = getTableSink(modifyOperation)
+ translate(
+ batchTableEnv,
+ optimizedNode,
+ tableSink,
+ getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+ case o =>
+ throw new TableException("Unsupported Operation: " + o.asSummaryString())
+ }
+ }
+
+ val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator)
+ val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
+
+ val env = dataSinks.head.getDataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
s"== Abstract Syntax Tree ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(ast)}" +
- System.lineSeparator +
- s"== Optimized Logical Plan ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(optimizedPlan)}" +
- System.lineSeparator +
- s"== Physical Execution Plan ==" +
- System.lineSeparator +
- s"$sqlPlan"
+ System.lineSeparator +
+ s"$astPlan" +
+ System.lineSeparator +
+ s"== Optimized Logical Plan ==" +
+ System.lineSeparator +
+ s"$optimizedPlan" +
+ System.lineSeparator +
+ s"== Physical Execution Plan ==" +
+ System.lineSeparator +
+ s"$sqlPlan"
}
- override def explain(table: Table): String = explain(table: Table, extended = false)
-
override def execute(jobName: String): JobExecutionResult = {
val plan = createPipelineAndClearBuffer(jobName)
@@ -292,10 +330,6 @@ abstract class BatchTableEnvImpl(
createPipelineAndClearBuffer(jobName)
}
- override def explain(extended: Boolean): String = {
- throw new TableException("This method is unsupported in old planner.")
- }
-
/**
* Translate the buffered sinks to Plan, and clear the buffer.
*
@@ -304,10 +338,11 @@ abstract class BatchTableEnvImpl(
* If the buffer is not clear after failure, the following `translate` will also fail.
*/
private def createPipelineAndClearBuffer(jobName: String): Pipeline = {
+ val dataSinks = translate(bufferedModifyOperations)
try {
- createPipeline(bufferedSinks, jobName)
+ createPipeline(dataSinks, jobName)
} finally {
- bufferedSinks.clear()
+ bufferedModifyOperations.clear()
}
}
@@ -357,6 +392,102 @@ abstract class BatchTableEnvImpl(
}
/**
+ * Translates a [[ModifyOperation]] into a [[RelNode]].
+ *
+ * The transformation does not involve optimizing the relational expression tree.
+ *
+ * @param modifyOperation The root ModifyOperation of the relational expression tree.
+ * @param addLogicalSink Whether add [[LogicalSink]] as the root.
+ * Currently, LogicalSink only is only used for explaining.
+ * @return The [[RelNode]] that corresponds to the translated [[ModifyOperation]].
+ */
+ private def translateToRel(modifyOperation: ModifyOperation, addLogicalSink: Boolean): RelNode = {
+ val input = getRelBuilder.tableOperation(modifyOperation.getChild).build()
+ if (addLogicalSink) {
+ val tableSink = getTableSink(modifyOperation)
+ modifyOperation match {
+ case s: CatalogSinkModifyOperation =>
+ LogicalSink.create(input, tableSink, s.getTableIdentifier.toString)
+ case o =>
+ throw new TableException("Unsupported Operation: " + o.asSummaryString())
+ }
+ } else {
+ input
+ }
+ }
+
+ /**
+ * Translates a list of [[ModifyOperation]] into a list of [[DataSink]].
+ *
+ * The transformation involves optimizing the relational expression tree as defined by
+ * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+ *
+ * @param modifyOperations The root [[ModifyOperation]]s of the relational expression tree.
+ * @return The [[DataSink]] that corresponds to the translated [[ModifyOperation]]s.
+ */
+ private def translate[T](modifyOperations: JList[ModifyOperation]): JList[DataSink[_]] = {
+ val relNodes = modifyOperations.asScala.map(o => translateToRel(o, addLogicalSink = false))
+ val optimizedNodes = relNodes.map(optimizer.optimize)
+
+ val batchTableEnv = createDummyBatchTableEnv()
+ modifyOperations.asScala.zip(optimizedNodes).map {
+ case (modifyOperation, optimizedNode) =>
+ val tableSink = getTableSink(modifyOperation)
+ translate(
+ batchTableEnv,
+ optimizedNode,
+ tableSink,
+ getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+ }.asJava
+ }
+
+ /**
+ * Translates an optimized [[RelNode]] into a [[DataSet]]
+ * and handed over to the [[TableSink]] to write it.
+ *
+ * @param optimizedNode The [[RelNode]] to translate.
+ * @param tableSink The [[TableSink]] to write the [[Table]] to.
+ * @return The [[DataSink]] that corresponds to the [[RelNode]] and the [[TableSink]].
+ */
+ private def translate[T](
+ batchTableEnv: BatchTableEnvImpl,
+ optimizedNode: RelNode,
+ tableSink: TableSink[T],
+ tableSchema: TableSchema): DataSink[_] = {
+ tableSink match {
+ case batchSink: BatchTableSink[T] =>
+ val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType)
+ .asInstanceOf[TypeInformation[T]]
+ // translate the Table into a DataSet and provide the type that the TableSink expects.
+ val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType)
+ // create a dummy NoOpOperator, which holds dummy DummyExecutionEnvironment as context.
+ // NoOpOperator will be ignored in OperatorTranslation
+ // when translating DataSet to Operator, while its input can be translated normally.
+ val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
+ // Give the DataSet to the TableSink to emit it.
+ batchSink.consumeDataSet(dummyOp)
+ case boundedSink: OutputFormatTableSink[T] =>
+ val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType)
+ .asInstanceOf[TypeInformation[T]]
+ // translate the Table into a DataSet and provide the type that the TableSink expects.
+ val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType)
+ // create a dummy NoOpOperator, which holds DummyExecutionEnvironment as context.
+ // NoOpOperator will be ignored in OperatorTranslation
+ // when translating DataSet to Operator, while its input can be translated normally.
+ val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
+ // use the OutputFormat to consume the DataSet.
+ val dataSink = dummyOp.output(boundedSink.getOutputFormat)
+ dataSink.name(
+ TableConnectorUtils.generateRuntimeName(
+ boundedSink.getClass,
+ boundedSink.getTableSchema.getFieldNames))
+ case _ =>
+ throw new TableException(
+ "BatchTableSink or OutputFormatTableSink required to emit batch Table.")
+ }
+ }
+
+ /**
* Translates a [[Table]] into a [[DataSet]].
*
* The transformation involves optimizing the relational expression tree as defined by
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index f40b75d..3a97106 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -37,7 +37,7 @@ import org.apache.flink.table.operations.ddl._
import org.apache.flink.table.operations.utils.OperationTreeBuilder
import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _}
import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder}
-import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
+import org.apache.flink.table.sinks.{BatchTableSink, OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.types.DataType
import org.apache.flink.table.util.JavaScalaConversionUtil
@@ -878,11 +878,11 @@ abstract class TableEnvImpl(
tableSink: TableSink[T]): DataSink[_]
/**
- * Add the given [[DataSink]] into the buffer.
+ * Add the given [[ModifyOperation]] into the buffer.
*
- * @param dataSink The [[DataSink]] to add the buffer to.
+ * @param modifyOperation The [[ModifyOperation]] to add the buffer to.
*/
- protected def addToBuffer(dataSink: DataSink[_]): Unit
+ protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit
override def insertInto(path: String, table: Table): Unit = {
val parser = planningConfigurationBuilder.createCalciteParser()
@@ -919,15 +919,64 @@ abstract class TableEnvImpl(
table: Table,
insertOptions: InsertOptions,
sinkIdentifier: ObjectIdentifier): Unit = {
- val dataSink = writeToSinkAndTranslate(table.getQueryOperation, insertOptions, sinkIdentifier)
- addToBuffer(dataSink)
+ val operation = new CatalogSinkModifyOperation(
+ sinkIdentifier,
+ table.getQueryOperation,
+ insertOptions.staticPartitions,
+ insertOptions.overwrite,
+ new JHashMap[String, String]())
+ addToBuffer(operation)
}
override def getParser: Parser = parser
override def getCatalogManager: CatalogManager = catalogManager
- private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
+ protected def getTableSink(modifyOperation: ModifyOperation): TableSink[_] = {
+ modifyOperation match {
+ case s: CatalogSinkModifyOperation =>
+ getTableSink(s.getTableIdentifier) match {
+ case None =>
+ throw new TableException(
+ s"No table was registered under the name ${s.getTableIdentifier}.")
+
+ case Some(tableSink) =>
+ tableSink match {
+ case _: BatchTableSink[_] => // do nothing
+ case _: OutputFormatTableSink[_] => // do nothing
+ case _ =>
+ throw new TableException(
+ "BatchTableSink or OutputFormatTableSink required to emit batch Table.")
+ }
+ // validate schema of source table and table sink
+ TableSinkUtils.validateSink(
+ s.getStaticPartitions,
+ s.getChild,
+ s.getTableIdentifier,
+ tableSink)
+ // set static partitions if it is a partitioned table sink
+ tableSink match {
+ case partitionableSink: PartitionableTableSink =>
+ partitionableSink.setStaticPartition(s.getStaticPartitions)
+ case _ =>
+ }
+ // set whether to overwrite if it's an OverwritableTableSink
+ tableSink match {
+ case overwritableTableSink: OverwritableTableSink =>
+ overwritableTableSink.setOverwrite(s.isOverwrite)
+ case _ =>
+ require(!s.isOverwrite, "INSERT OVERWRITE requires " +
+ s"${classOf[OverwritableTableSink].getSimpleName} but actually got " +
+ tableSink.getClass.getName)
+ }
+ tableSink
+ }
+ case o =>
+ throw new TableException("Unsupported Operation: " + o.asSummaryString())
+ }
+ }
+
+ protected def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
.map(_.getTable) match {
case Some(s) if s.isInstanceOf[ConnectorCatalogTable[_, _]] =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
new file mode 100644
index 0000000..f3fd194
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.DataSink
+import org.apache.flink.table.api.internal.BatchTableEnvImpl
+import org.apache.flink.table.plan.nodes.Sink
+import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
+import org.apache.flink.types.Row
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+/**
+ * A special [[DataSetRel]] which make explain result more pretty.
+ *
+ * <p>NOTES: We can't move the [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] logic
+ * from [[BatchTableEnvImpl]] to this node, because the return types of
+ * [[DataSetRel#translateToPlan]] (which returns [[DataSet]]) and
+ * [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] (which returns [[DataSink]]) are
+ * different. [[DataSetSink#translateToPlan]] just returns the input's translated result.
+ */
+class DataSetSink(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputRel: RelNode,
+ sink: TableSink[_],
+ sinkName: String)
+ extends Sink(cluster, traitSet, inputRel, sink, sinkName)
+ with DataSetRel {
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetSink(cluster, traitSet, inputs.get(0), sink, sinkName)
+ }
+
+ override def translateToPlan(tableEnv: BatchTableEnvImpl): DataSet[Row] = {
+ getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ }
+
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a872fb4..50a1bca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -220,7 +220,8 @@ object FlinkRuleSets {
DataSetValuesRule.INSTANCE,
DataSetCorrelateRule.INSTANCE,
DataSetPythonCorrelateRule.INSTANCE,
- BatchTableSourceScanRule.INSTANCE
+ BatchTableSourceScanRule.INSTANCE,
+ DataSetSinkRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
new file mode 100644
index 0000000..b7786b0
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.dataset.DataSetSink
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+class DataSetSinkRule
+ extends ConverterRule(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.DATASET,
+ "DataSetSinkRule") {
+
+ def convert(rel: RelNode): RelNode = {
+ val sink: FlinkLogicalSink = rel.asInstanceOf[FlinkLogicalSink]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
+ val convInput: RelNode = RelOptRule.convert(sink.getInput(0), FlinkConventions.DATASET)
+
+ new DataSetSink(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ sink.sink,
+ sink.sinkName
+ )
+ }
+}
+
+object DataSetSinkRule {
+ val INSTANCE = new DataSetSinkRule
+}
+
+
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index a2df711..fc33f8d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -18,20 +18,22 @@
package org.apache.flink.table.api.batch
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl
-import org.apache.flink.table.utils.TableTestUtil.batchTableNode
+import org.apache.flink.table.api.{Table, Types}
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, readFromResource, replaceStageId}
import org.apache.flink.test.util.MultipleProgramsTestBase
+
import org.junit.Assert.assertEquals
import org.junit._
class ExplainTest
extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
- private val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
-
@Test
def testFilterWithoutExtended(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -41,8 +43,7 @@ class ExplainTest
val table = scan.filter($"a" % 2 === 0)
val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testFilter0.out").mkString
+ val source = readFromResource("testFilter0.out")
val expected = replaceString(source, scan)
assertEquals(expected, result)
@@ -58,8 +59,7 @@ class ExplainTest
val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
.explain(table, extended = true).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testFilter1.out").mkString
+ val source = readFromResource("testFilter1.out")
val expected = replaceString(source, scan)
assertEquals(expected, result)
@@ -75,8 +75,7 @@ class ExplainTest
val table = table1.join(table2).where($"b" === $"d").select($"a", $"c")
val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testJoin0.out").mkString
+ val source = readFromResource("testJoin0.out")
val expected = replaceString(source, table1, table2)
assertEquals(expected, result)
@@ -93,8 +92,7 @@ class ExplainTest
val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
.explain(table, extended = true).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testJoin1.out").mkString
+ val source = readFromResource("testJoin1.out")
val expected = replaceString(source, table1, table2)
assertEquals(expected, result)
@@ -110,8 +108,7 @@ class ExplainTest
val table = table1.unionAll(table2)
val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testUnion0.out").mkString
+ val source = readFromResource("testUnion0.out")
val expected = replaceString(source, table1, table2)
assertEquals(expected, result)
@@ -128,13 +125,51 @@ class ExplainTest
val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
.explain(table, extended = true).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testUnion1.out").mkString
+ val source = readFromResource("testUnion1.out")
val expected = replaceString(source, table1, table2)
assertEquals(expected, result)
}
+ @Test
+ def testInsert(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = BatchTableEnvironment.create(env)
+
+ tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
+
+ tEnv.sqlUpdate("insert into targetTable select first, id from sourceTable")
+
+ val result = tEnv.explain(false)
+ val expected = readFromResource("testInsert1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(result))
+ }
+
+ @Test
+ def testMultipleInserts(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = BatchTableEnvironment.create(env)
+
+ tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable1", sink.configure(fieldNames, fieldTypes))
+ tEnv.registerTableSink("targetTable2", sink.configure(fieldNames, fieldTypes))
+
+ tEnv.sqlUpdate("insert into targetTable1 select first, id from sourceTable")
+ tEnv.sqlUpdate("insert into targetTable2 select last, id from sourceTable")
+
+ val result = tEnv.explain(false)
+ val expected = readFromResource("testMultipleInserts1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(result))
+ }
def replaceString(s: String, t1: Table, t2: Table): String = {
replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1)
@@ -144,14 +179,14 @@ class ExplainTest
replaceSourceNode(replaceString(s), t, 0)
}
- private def replaceSourceNode(s: String, t: Table, idx: Int) = {
+ private def replaceSourceNode(s: String, t: Table, idx: Int): String = {
s.replace(
s"%logicalSourceNode$idx%", batchTableNode(t)
.replace("DataSetScan", "FlinkLogicalDataSetScan"))
.replace(s"%sourceNode$idx%", batchTableNode(t))
}
- def replaceString(s: String) = {
+ def replaceString(s: String): String = {
s.replaceAll("\\r\\n", "\n")
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index d8915df..ab07039 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase {
// must fail because table sink schema has too few fields
util.tableEnv.sqlUpdate(sql)
+ // trigger validation
+ util.tableEnv.execute("test")
}
@Test(expected = classOf[ValidationException])
@@ -57,6 +59,8 @@ class InsertIntoValidationTest extends TableTestBase {
// must fail because types of table sink do not match query result
util.tableEnv.sqlUpdate(sql)
+ // trigger validation
+ util.tableEnv.execute("test")
}
@Test(expected = classOf[ValidationException])
@@ -73,6 +77,8 @@ class InsertIntoValidationTest extends TableTestBase {
// must fail because partial insert is not supported yet.
util.tableEnv.sqlUpdate(sql)
+ // trigger validation
+ util.tableEnv.execute("test")
}
@Test
@@ -92,5 +98,7 @@ class InsertIntoValidationTest extends TableTestBase {
val sql = "INSERT INTO targetTable SELECT a, b FROM sourceTable"
util.tableEnv.sqlUpdate(sql)
+ // trigger validation
+ util.tableEnv.execute("test")
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
index 4ae77f9..210a598 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
@@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase {
util.tableEnv.scan("sourceTable")
.select('a, 'b, 'c)
.insertInto("targetTable")
+ // trigger validation
+ util.tableEnv.execute("test")
}
@Test(expected = classOf[ValidationException])
@@ -57,5 +59,7 @@ class InsertIntoValidationTest extends TableTestBase {
util.tableEnv.scan("sourceTable")
.select('a, 'b, 'c)
.insertInto("targetTable")
+ // trigger validation
+ util.tableEnv.execute("test")
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
new file mode 100644
index 0000000..9b78a10
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e])
+ BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : to: Row
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
new file mode 100644
index 0000000..c8979bd
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
@@ -0,0 +1,51 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e])
+ LogicalProject(last=[$3], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e])
+ BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e])
+ BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[last, id], source=[CsvTableSource(read fields: last, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : to: Row
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : to: Row
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+