You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/08 12:57:29 UTC

[flink] 02/05: [FLINK-17267] [table-planner] legacy batch planner supports explain insert operation

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2fc82ad5b92367c9f9a2aad5e6df3fd78108754e
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Apr 24 20:45:12 2020 +0800

    [FLINK-17267] [table-planner] legacy batch planner supports explain insert operation
---
 .../table/tests/test_table_environment_api.py      |   5 +-
 .../table/api/internal/BatchTableEnvImpl.scala     | 191 +++++++++++++++++----
 .../flink/table/api/internal/TableEnvImpl.scala    |  63 ++++++-
 .../table/plan/nodes/dataset/DataSetSink.scala     |  57 ++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala     |   3 +-
 .../table/plan/rules/dataSet/DataSetSinkRule.scala |  55 ++++++
 .../apache/flink/table/api/batch/ExplainTest.scala |  71 ++++++--
 .../sql/validation/InsertIntoValidationTest.scala  |   8 +
 .../validation/InsertIntoValidationTest.scala      |   4 +
 .../src/test/scala/resources/testInsert1.out       |  27 +++
 .../test/scala/resources/testMultipleInserts1.out  |  51 ++++++
 11 files changed, 477 insertions(+), 58 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index c1bf04a..87c8023 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -400,8 +400,9 @@ class BatchTableEnvironmentTests(TableEnvironmentTest, PyFlinkBatchTableTestCase
         t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
         t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
 
-        with self.assertRaises(TableException):
-            t_env.explain(extended=True)
+        actual = t_env.explain(extended=True)
+
+        assert isinstance(actual, str)
 
     def test_create_table_environment(self):
         table_config = TableConfig()
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 343fc23..e25b8e4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -38,8 +38,9 @@ import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
 import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
 import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{DataSetQueryOperation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation}
 import org.apache.flink.table.plan.BatchOptimizer
+import org.apache.flink.table.plan.nodes.LogicalSink
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.planner.Conversions
 import org.apache.flink.table.runtime.MapRunner
@@ -74,7 +75,7 @@ abstract class BatchTableEnvImpl(
     moduleManager: ModuleManager)
   extends TableEnvImpl(config, catalogManager, moduleManager) {
 
-  private val bufferedSinks = new JArrayList[DataSink[_]]
+  private val bufferedModifyOperations = new JArrayList[ModifyOperation]()
 
   private[flink] val optimizer = new BatchOptimizer(
     () => config.getPlannerConfig.unwrap(classOf[CalciteConfig]).orElse(CalciteConfig.DEFAULT),
@@ -170,8 +171,8 @@ abstract class BatchTableEnvImpl(
     }
   }
 
-  override protected def addToBuffer(sink: DataSink[_]): Unit = {
-    bufferedSinks.add(sink)
+  override protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit = {
+    bufferedModifyOperations.add(modifyOperation)
   }
 
   /**
@@ -215,32 +216,69 @@ abstract class BatchTableEnvImpl(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-    val ast = getRelBuilder.tableOperation(table.getQueryOperation).build()
-    val optimizedPlan = optimizer.optimize(ast)
-    val dataSet = translate[Row](
-      optimizedPlan,
-      getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan))(
-      new GenericTypeInfo(classOf[Row]))
-    dataSet.output(new DiscardingOutputFormat[Row])
-    val env = dataSet.getExecutionEnvironment
+    explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended)
+  }
+
+  override def explain(table: Table): String = explain(table: Table, extended = false)
+
+  override def explain(extended: Boolean): String = {
+    explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
+  }
+
+  private def explain(operations: JList[Operation], extended: Boolean): String = {
+    require(operations.asScala.nonEmpty, "operations should not be empty")
+    val astList = operations.asScala.map {
+      case queryOperation: QueryOperation =>
+        getRelBuilder.tableOperation(queryOperation).build()
+      case modifyOperation: ModifyOperation =>
+        translateToRel(modifyOperation, addLogicalSink = true)
+      case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}")
+    }
+
+    val optimizedNodes = astList.map(optimizer.optimize)
+
+    val batchTableEnv = createDummyBatchTableEnv()
+    val dataSinks = optimizedNodes.zip(operations.asScala).map {
+      case (optimizedNode, operation) =>
+        operation match {
+          case queryOperation: QueryOperation =>
+            val dataSet = translate[Row](
+              optimizedNode,
+              getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))(
+              new GenericTypeInfo(classOf[Row]))
+            dataSet.output(new DiscardingOutputFormat[Row])
+          case modifyOperation: ModifyOperation =>
+            val tableSink = getTableSink(modifyOperation)
+            translate(
+              batchTableEnv,
+              optimizedNode,
+              tableSink,
+              getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+          case o =>
+            throw new TableException("Unsupported Operation: " + o.asSummaryString())
+        }
+    }
+
+    val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator)
+    val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
+
+    val env = dataSinks.head.getDataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
 
     s"== Abstract Syntax Tree ==" +
-        System.lineSeparator +
-        s"${RelOptUtil.toString(ast)}" +
-        System.lineSeparator +
-        s"== Optimized Logical Plan ==" +
-        System.lineSeparator +
-        s"${RelOptUtil.toString(optimizedPlan)}" +
-        System.lineSeparator +
-        s"== Physical Execution Plan ==" +
-        System.lineSeparator +
-        s"$sqlPlan"
+      System.lineSeparator +
+      s"$astPlan" +
+      System.lineSeparator +
+      s"== Optimized Logical Plan ==" +
+      System.lineSeparator +
+      s"$optimizedPlan" +
+      System.lineSeparator +
+      s"== Physical Execution Plan ==" +
+      System.lineSeparator +
+      s"$sqlPlan"
   }
 
-  override def explain(table: Table): String = explain(table: Table, extended = false)
-
   override def execute(jobName: String): JobExecutionResult = {
     val plan = createPipelineAndClearBuffer(jobName)
 
@@ -292,10 +330,6 @@ abstract class BatchTableEnvImpl(
     createPipelineAndClearBuffer(jobName)
   }
 
-  override def explain(extended: Boolean): String = {
-    throw new TableException("This method is unsupported in old planner.")
-  }
-
   /**
     * Translate the buffered sinks to Plan, and clear the buffer.
     *
@@ -304,10 +338,11 @@ abstract class BatchTableEnvImpl(
     * If the buffer is not clear after failure, the following `translate` will also fail.
     */
   private def createPipelineAndClearBuffer(jobName: String): Pipeline = {
+    val dataSinks = translate(bufferedModifyOperations)
     try {
-      createPipeline(bufferedSinks, jobName)
+      createPipeline(dataSinks, jobName)
     } finally {
-      bufferedSinks.clear()
+      bufferedModifyOperations.clear()
     }
   }
 
@@ -357,6 +392,102 @@ abstract class BatchTableEnvImpl(
   }
 
   /**
+    * Translates a [[ModifyOperation]] into a [[RelNode]].
+    *
+    * The transformation does not involve optimizing the relational expression tree.
+    *
+    * @param modifyOperation The root ModifyOperation of the relational expression tree.
+    * @param addLogicalSink Whether add [[LogicalSink]] as the root.
+    *                       Currently, LogicalSink only is only used for explaining.
+    * @return The [[RelNode]] that corresponds to the translated [[ModifyOperation]].
+    */
+  private def translateToRel(modifyOperation: ModifyOperation, addLogicalSink: Boolean): RelNode = {
+    val input = getRelBuilder.tableOperation(modifyOperation.getChild).build()
+    if (addLogicalSink) {
+      val tableSink = getTableSink(modifyOperation)
+      modifyOperation match {
+        case s: CatalogSinkModifyOperation =>
+          LogicalSink.create(input, tableSink, s.getTableIdentifier.toString)
+        case o =>
+          throw new TableException("Unsupported Operation: " + o.asSummaryString())
+      }
+    } else {
+      input
+    }
+  }
+
+  /**
+    * Translates a list of [[ModifyOperation]] into a list of [[DataSink]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+    *
+    * @param modifyOperations The root [[ModifyOperation]]s of the relational expression tree.
+    * @return The [[DataSink]] that corresponds to the translated [[ModifyOperation]]s.
+    */
+  private def translate[T](modifyOperations: JList[ModifyOperation]): JList[DataSink[_]] = {
+    val relNodes = modifyOperations.asScala.map(o => translateToRel(o, addLogicalSink = false))
+    val optimizedNodes = relNodes.map(optimizer.optimize)
+
+    val batchTableEnv = createDummyBatchTableEnv()
+    modifyOperations.asScala.zip(optimizedNodes).map {
+      case (modifyOperation, optimizedNode) =>
+        val tableSink = getTableSink(modifyOperation)
+        translate(
+          batchTableEnv,
+          optimizedNode,
+          tableSink,
+          getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+    }.asJava
+  }
+
+  /**
+    * Translates an optimized [[RelNode]] into a [[DataSet]]
+    * and handed over to the [[TableSink]] to write it.
+    *
+    * @param optimizedNode The [[RelNode]] to translate.
+    * @param tableSink The [[TableSink]] to write the [[Table]] to.
+    * @return The [[DataSink]] that corresponds to the [[RelNode]] and the [[TableSink]].
+    */
+  private def translate[T](
+      batchTableEnv: BatchTableEnvImpl,
+      optimizedNode: RelNode,
+      tableSink: TableSink[T],
+      tableSchema: TableSchema): DataSink[_] = {
+    tableSink match {
+      case batchSink: BatchTableSink[T] =>
+        val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType)
+          .asInstanceOf[TypeInformation[T]]
+        // translate the Table into a DataSet and provide the type that the TableSink expects.
+        val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType)
+        // create a dummy NoOpOperator, which holds dummy DummyExecutionEnvironment as context.
+        // NoOpOperator will be ignored in OperatorTranslation
+        // when translating DataSet to Operator, while its input can be translated normally.
+        val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
+        // Give the DataSet to the TableSink to emit it.
+        batchSink.consumeDataSet(dummyOp)
+      case boundedSink: OutputFormatTableSink[T] =>
+        val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType)
+          .asInstanceOf[TypeInformation[T]]
+        // translate the Table into a DataSet and provide the type that the TableSink expects.
+        val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType)
+        // create a dummy NoOpOperator, which holds DummyExecutionEnvironment as context.
+        // NoOpOperator will be ignored in OperatorTranslation
+        // when translating DataSet to Operator, while its input can be translated normally.
+        val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
+        // use the OutputFormat to consume the DataSet.
+        val dataSink = dummyOp.output(boundedSink.getOutputFormat)
+        dataSink.name(
+          TableConnectorUtils.generateRuntimeName(
+            boundedSink.getClass,
+            boundedSink.getTableSchema.getFieldNames))
+      case _ =>
+        throw new TableException(
+          "BatchTableSink or OutputFormatTableSink required to emit batch Table.")
+    }
+  }
+
+  /**
     * Translates a [[Table]] into a [[DataSet]].
     *
     * The transformation involves optimizing the relational expression tree as defined by
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index f40b75d..3a97106 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -37,7 +37,7 @@ import org.apache.flink.table.operations.ddl._
 import org.apache.flink.table.operations.utils.OperationTreeBuilder
 import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _}
 import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder}
-import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
+import org.apache.flink.table.sinks.{BatchTableSink, OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.util.JavaScalaConversionUtil
@@ -878,11 +878,11 @@ abstract class TableEnvImpl(
       tableSink: TableSink[T]): DataSink[_]
 
   /**
-    * Add the given [[DataSink]] into the buffer.
+    * Add the given [[ModifyOperation]] into the buffer.
     *
-    * @param dataSink The [[DataSink]] to add the buffer to.
+    * @param modifyOperation The [[ModifyOperation]] to add the buffer to.
     */
-  protected def addToBuffer(dataSink: DataSink[_]): Unit
+  protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit
 
   override def insertInto(path: String, table: Table): Unit = {
     val parser = planningConfigurationBuilder.createCalciteParser()
@@ -919,15 +919,64 @@ abstract class TableEnvImpl(
       table: Table,
       insertOptions: InsertOptions,
       sinkIdentifier: ObjectIdentifier): Unit = {
-    val dataSink = writeToSinkAndTranslate(table.getQueryOperation, insertOptions, sinkIdentifier)
-    addToBuffer(dataSink)
+    val operation = new CatalogSinkModifyOperation(
+      sinkIdentifier,
+      table.getQueryOperation,
+      insertOptions.staticPartitions,
+      insertOptions.overwrite,
+      new JHashMap[String, String]())
+    addToBuffer(operation)
   }
 
   override def getParser: Parser = parser
 
   override def getCatalogManager: CatalogManager = catalogManager
 
-  private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
+  protected def getTableSink(modifyOperation: ModifyOperation): TableSink[_] = {
+    modifyOperation match {
+      case s: CatalogSinkModifyOperation =>
+        getTableSink(s.getTableIdentifier) match {
+          case None =>
+            throw new TableException(
+              s"No table was registered under the name ${s.getTableIdentifier}.")
+
+          case Some(tableSink) =>
+            tableSink match {
+              case _: BatchTableSink[_] => // do nothing
+              case _: OutputFormatTableSink[_] => // do nothing
+              case _ =>
+                throw new TableException(
+                  "BatchTableSink or OutputFormatTableSink required to emit batch Table.")
+            }
+            // validate schema of source table and table sink
+            TableSinkUtils.validateSink(
+              s.getStaticPartitions,
+              s.getChild,
+              s.getTableIdentifier,
+              tableSink)
+            // set static partitions if it is a partitioned table sink
+            tableSink match {
+              case partitionableSink: PartitionableTableSink =>
+                partitionableSink.setStaticPartition(s.getStaticPartitions)
+              case _ =>
+            }
+            // set whether to overwrite if it's an OverwritableTableSink
+            tableSink match {
+              case overwritableTableSink: OverwritableTableSink =>
+                overwritableTableSink.setOverwrite(s.isOverwrite)
+              case _ =>
+                require(!s.isOverwrite, "INSERT OVERWRITE requires " +
+                  s"${classOf[OverwritableTableSink].getSimpleName} but actually got " +
+                  tableSink.getClass.getName)
+            }
+            tableSink
+        }
+      case o =>
+        throw new TableException("Unsupported Operation: " + o.asSummaryString())
+    }
+  }
+
+  protected def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
     JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
       .map(_.getTable) match {
       case Some(s) if s.isInstanceOf[ConnectorCatalogTable[_, _]] =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
new file mode 100644
index 0000000..f3fd194
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.DataSink
+import org.apache.flink.table.api.internal.BatchTableEnvImpl
+import org.apache.flink.table.plan.nodes.Sink
+import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
+import org.apache.flink.types.Row
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+/**
+  * A special [[DataSetRel]] which make explain result more pretty.
+  *
+  * <p>NOTES: We can't move the [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] logic
+  * from [[BatchTableEnvImpl]] to this node, because the return types of
+  * [[DataSetRel#translateToPlan]] (which returns [[DataSet]]) and
+  * [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] (which returns [[DataSink]]) are
+  * different. [[DataSetSink#translateToPlan]] just returns the input's translated result.
+  */
+class DataSetSink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    sink: TableSink[_],
+    sinkName: String)
+  extends Sink(cluster, traitSet, inputRel, sink, sinkName)
+    with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetSink(cluster, traitSet, inputs.get(0), sink, sinkName)
+  }
+
+  override def translateToPlan(tableEnv: BatchTableEnvImpl): DataSet[Row] = {
+    getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+  }
+
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a872fb4..50a1bca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -220,7 +220,8 @@ object FlinkRuleSets {
     DataSetValuesRule.INSTANCE,
     DataSetCorrelateRule.INSTANCE,
     DataSetPythonCorrelateRule.INSTANCE,
-    BatchTableSourceScanRule.INSTANCE
+    BatchTableSourceScanRule.INSTANCE,
+    DataSetSinkRule.INSTANCE
   )
 
   /**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
new file mode 100644
index 0000000..b7786b0
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.dataset.DataSetSink
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+class DataSetSinkRule
+  extends ConverterRule(
+    classOf[FlinkLogicalSink],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASET,
+    "DataSetSinkRule") {
+
+  def convert(rel: RelNode): RelNode = {
+    val sink: FlinkLogicalSink = rel.asInstanceOf[FlinkLogicalSink]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
+    val convInput: RelNode = RelOptRule.convert(sink.getInput(0), FlinkConventions.DATASET)
+
+    new DataSetSink(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      sink.sink,
+      sink.sinkName
+    )
+  }
+}
+
+object DataSetSinkRule {
+  val INSTANCE = new DataSetSinkRule
+}
+
+
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index a2df711..fc33f8d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -18,20 +18,22 @@
 
 package org.apache.flink.table.api.batch
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Table
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl
-import org.apache.flink.table.utils.TableTestUtil.batchTableNode
+import org.apache.flink.table.api.{Table, Types}
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, readFromResource, replaceStageId}
 import org.apache.flink.test.util.MultipleProgramsTestBase
+
 import org.junit.Assert.assertEquals
 import org.junit._
 
 class ExplainTest
   extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
 
-  private val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
-
   @Test
   def testFilterWithoutExtended(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -41,8 +43,7 @@ class ExplainTest
     val table = scan.filter($"a" % 2 === 0)
 
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter0.out").mkString
+    val source = readFromResource("testFilter0.out")
 
     val expected = replaceString(source, scan)
     assertEquals(expected, result)
@@ -58,8 +59,7 @@ class ExplainTest
 
     val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
       .explain(table, extended = true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter1.out").mkString
+    val source = readFromResource("testFilter1.out")
 
     val expected = replaceString(source, scan)
     assertEquals(expected, result)
@@ -75,8 +75,7 @@ class ExplainTest
     val table = table1.join(table2).where($"b" === $"d").select($"a", $"c")
 
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin0.out").mkString
+    val source = readFromResource("testJoin0.out")
 
     val expected = replaceString(source, table1, table2)
     assertEquals(expected, result)
@@ -93,8 +92,7 @@ class ExplainTest
 
     val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
       .explain(table, extended = true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin1.out").mkString
+    val source = readFromResource("testJoin1.out")
 
     val expected = replaceString(source, table1, table2)
     assertEquals(expected, result)
@@ -110,8 +108,7 @@ class ExplainTest
     val table = table1.unionAll(table2)
 
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion0.out").mkString
+    val source = readFromResource("testUnion0.out")
 
     val expected = replaceString(source, table1, table2)
     assertEquals(expected, result)
@@ -128,13 +125,51 @@ class ExplainTest
 
     val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
       .explain(table, extended = true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion1.out").mkString
+    val source = readFromResource("testUnion1.out")
 
     val expected = replaceString(source, table1, table2)
     assertEquals(expected, result)
   }
 
+  @Test
+  def testInsert(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = BatchTableEnvironment.create(env)
+
+    tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
+
+    tEnv.sqlUpdate("insert into targetTable select first, id from sourceTable")
+
+    val result = tEnv.explain(false)
+    val expected = readFromResource("testInsert1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(result))
+  }
+
+  @Test
+  def testMultipleInserts(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = BatchTableEnvironment.create(env)
+
+    tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable1", sink.configure(fieldNames, fieldTypes))
+    tEnv.registerTableSink("targetTable2", sink.configure(fieldNames, fieldTypes))
+
+    tEnv.sqlUpdate("insert into targetTable1 select first, id from sourceTable")
+    tEnv.sqlUpdate("insert into targetTable2 select last, id from sourceTable")
+
+    val result = tEnv.explain(false)
+    val expected = readFromResource("testMultipleInserts1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(result))
+  }
 
   def replaceString(s: String, t1: Table, t2: Table): String = {
     replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1)
@@ -144,14 +179,14 @@ class ExplainTest
     replaceSourceNode(replaceString(s), t, 0)
   }
 
-  private def replaceSourceNode(s: String, t: Table, idx: Int) = {
+  private def replaceSourceNode(s: String, t: Table, idx: Int): String = {
     s.replace(
       s"%logicalSourceNode$idx%", batchTableNode(t)
         .replace("DataSetScan", "FlinkLogicalDataSetScan"))
       .replace(s"%sourceNode$idx%", batchTableNode(t))
   }
 
-  def replaceString(s: String) = {
+  def replaceString(s: String): String = {
     s.replaceAll("\\r\\n", "\n")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index d8915df..ab07039 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase {
 
     // must fail because table sink schema has too few fields
     util.tableEnv.sqlUpdate(sql)
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 
   @Test(expected = classOf[ValidationException])
@@ -57,6 +59,8 @@ class InsertIntoValidationTest extends TableTestBase {
 
     // must fail because types of table sink do not match query result
     util.tableEnv.sqlUpdate(sql)
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 
   @Test(expected = classOf[ValidationException])
@@ -73,6 +77,8 @@ class InsertIntoValidationTest extends TableTestBase {
 
     // must fail because partial insert is not supported yet.
     util.tableEnv.sqlUpdate(sql)
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 
   @Test
@@ -92,5 +98,7 @@ class InsertIntoValidationTest extends TableTestBase {
     val sql = "INSERT INTO targetTable SELECT a, b FROM sourceTable"
 
     util.tableEnv.sqlUpdate(sql)
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
index 4ae77f9..210a598 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
@@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase {
     util.tableEnv.scan("sourceTable")
       .select('a, 'b, 'c)
       .insertInto("targetTable")
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 
   @Test(expected = classOf[ValidationException])
@@ -57,5 +59,7 @@ class InsertIntoValidationTest extends TableTestBase {
     util.tableEnv.scan("sourceTable")
       .select('a, 'b, 'c)
       .insertInto("targetTable")
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
new file mode 100644
index 0000000..9b78a10
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e])
+  BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : to: Row
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : UnsafeMemoryAppendTableSink(d, e)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
new file mode 100644
index 0000000..c8979bd
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
@@ -0,0 +1,51 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e])
+  LogicalProject(last=[$3], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e])
+  BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e])
+  BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[last, id], source=[CsvTableSource(read fields: last, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : to: Row
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : UnsafeMemoryAppendTableSink(d, e)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : to: Row
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : UnsafeMemoryAppendTableSink(d, e)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+