You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/08 12:57:27 UTC

[flink] branch master updated (0d1738b -> 4959ebf)

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 0d1738b  [FLINK-17256][python] Support keyword arguments in the PyFlink descriptor API
     new 59ca355  [FLINK-17267] [table-planner] legacy stream planner supports explain insert operation
     new 2fc82ad  [FLINK-17267] [table-planner] legacy batch planner supports explain insert operation
     new 6013a62  [FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement
     new 1a5fc5f  [FLINK-17267] [table] Introduce TableEnvironment#explainSql api
     new 4959ebf  [FLINK-17267] [table] Introduce Table#explain api

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-python/pyflink/table/__init__.py             |   4 +-
 flink-python/pyflink/table/explain_detail.py       |  34 +++
 flink-python/pyflink/table/table.py                |  14 ++
 flink-python/pyflink/table/table_environment.py    |  19 +-
 flink-python/pyflink/table/tests/test_explain.py   |  40 +++
 .../table/tests/test_table_environment_api.py      |  34 ++-
 flink-python/pyflink/util/utils.py                 |  20 ++
 .../org/apache/flink/table/api/ExplainDetail.java} |  45 ++--
 .../java/org/apache/flink/table/api/Table.java     |  10 +
 .../apache/flink/table/api/TableEnvironment.java   |  15 +-
 .../table/api/internal/TableEnvironmentImpl.java   |  48 +++-
 .../api/internal/TableEnvironmentInternal.java     |  14 ++
 .../apache/flink/table/api/internal/TableImpl.java |   6 +
 .../flink/table/api/internal/TableResultImpl.java  |  51 +++-
 .../org/apache/flink/table/delegation/Planner.java |   7 +-
 .../flink/table/operations/ExplainOperation.java}  |  39 ++-
 .../org/apache/flink/table/utils/PlannerMock.java  |   3 +-
 .../operations/SqlToOperationConverter.java        |  19 ++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  11 +-
 .../table/planner/delegation/BatchPlanner.scala    |  30 ++-
 .../table/planner/delegation/StreamPlanner.scala   |  33 ++-
 .../explain/testExecuteSqlWithExplainInsert.out    |  31 +++
 .../explain/testExecuteSqlWithExplainSelect.out    |  21 ++
 .../resources/explain/testExplainSqlWithInsert.out |  31 +++
 .../resources/explain/testExplainSqlWithSelect.out |  21 ++
 .../flink/table/api/TableEnvironmentTest.scala     | 194 +++++++++++++-
 .../flink/table/planner/utils/TableTestBase.scala  |   2 +-
 .../table/sqlexec/SqlToOperationConverter.java     |  19 ++
 .../table/api/internal/BatchTableEnvImpl.scala     | 230 ++++++++++++++---
 .../flink/table/api/internal/TableEnvImpl.scala    |  88 ++++++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |  11 +-
 .../table/calcite/RelTimeIndicatorConverter.scala  |  30 +++
 .../flink/table/plan/nodes/LogicalSink.scala       |  55 ++++
 .../org/apache/flink/table/plan/nodes/Sink.scala   |  57 +++++
 .../table/plan/nodes/dataset/DataSetSink.scala     |  57 +++++
 .../plan/nodes/datastream/DataStreamSink.scala     | 204 +++++++++++++++
 .../plan/nodes/logical/FlinkLogicalSink.scala      |  73 ++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala     |   9 +-
 .../table/plan/rules/dataSet/DataSetSinkRule.scala |  55 ++++
 .../plan/rules/datastream/DataStreamSinkRule.scala |  53 ++++
 .../apache/flink/table/planner/StreamPlanner.scala | 280 +++++++--------------
 .../flink/table/sinks/DataStreamTableSink.scala    |  58 +++++
 .../flink/table/api/TableEnvironmentITCase.scala   |  17 +-
 .../api/batch/BatchTableEnvironmentTest.scala      | 201 ++++++++++++++-
 .../apache/flink/table/api/batch/ExplainTest.scala |  71 ++++--
 .../sql/validation/InsertIntoValidationTest.scala  |   8 +
 .../validation/InsertIntoValidationTest.scala      |   4 +
 .../flink/table/api/stream/ExplainTest.scala       |  75 ++++--
 .../api/stream/StreamTableEnvironmentTest.scala    | 206 ++++++++++++++-
 .../flink/table/utils/MockTableEnvironment.scala   |   4 +-
 .../apache/flink/table/utils/TableTestBase.scala   |  10 +
 .../resources/testExecuteSqlWithExplainInsert0.out |  31 +++
 .../resources/testExecuteSqlWithExplainInsert1.out |  36 +++
 .../resources/testExecuteSqlWithExplainSelect0.out |  21 ++
 .../resources/testExecuteSqlWithExplainSelect1.out |  27 ++
 .../scala/resources/testExplainSqlWithInsert0.out  |  31 +++
 .../scala/resources/testExplainSqlWithInsert1.out  |  43 ++++
 .../scala/resources/testExplainSqlWithSelect0.out  |  21 ++
 .../scala/resources/testExplainSqlWithSelect1.out  |  27 ++
 .../resources/testFromToDataStreamAndSqlUpdate.out |  23 +-
 .../src/test/scala/resources/testInsert.out        |  29 +++
 .../src/test/scala/resources/testInsert1.out       |  27 ++
 .../test/scala/resources/testMultipleInserts.out   |  55 ++++
 .../test/scala/resources/testMultipleInserts1.out  |  51 ++++
 .../resources/testSqlUpdateAndToDataStream.out     |  20 +-
 65 files changed, 2709 insertions(+), 404 deletions(-)
 create mode 100644 flink-python/pyflink/table/explain_detail.py
 create mode 100644 flink-python/pyflink/table/tests/test_explain.py
 copy flink-table/flink-table-api-java/src/{test/java/org/apache/flink/table/utils/PlannerMock.java => main/java/org/apache/flink/table/api/ExplainDetail.java} (50%)
 copy flink-table/flink-table-api-java/src/{test/java/org/apache/flink/table/utils/PlannerMock.java => main/java/org/apache/flink/table/operations/ExplainOperation.java} (52%)
 create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
 create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
 create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
 create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
 create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/LogicalSink.scala
 create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/Sink.scala
 create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
 create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSink.scala
 create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
 create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
 create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSinkRule.scala
 create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testInsert.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts.out
 create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out


[flink] 03/05: [FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6013a62523bd2fc73b6aa7b3109f6b7c138ac51c
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Apr 22 17:08:00 2020 +0800

    [FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement
---
 .../table/api/internal/TableEnvironmentImpl.java   |   9 ++
 .../flink/table/operations/ExplainOperation.java   |  46 ++++++++
 .../operations/SqlToOperationConverter.java        |  19 ++++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  11 +-
 .../table/planner/delegation/BatchPlanner.scala    |  22 +++-
 .../table/planner/delegation/StreamPlanner.scala   |  22 +++-
 .../explain/testExecuteSqlWithExplainInsert.out    |  31 +++++
 .../explain/testExecuteSqlWithExplainSelect.out    |  21 ++++
 .../flink/table/api/TableEnvironmentTest.scala     | 115 ++++++++++++++++++-
 .../table/sqlexec/SqlToOperationConverter.java     |  19 ++++
 .../table/api/internal/BatchTableEnvImpl.scala     |  71 ++++++++----
 .../flink/table/api/internal/TableEnvImpl.scala    |  13 ++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |  11 +-
 .../apache/flink/table/planner/StreamPlanner.scala |  17 ++-
 .../flink/table/api/TableEnvironmentITCase.scala   |  13 +--
 .../api/batch/BatchTableEnvironmentTest.scala      | 120 +++++++++++++++++++-
 .../api/stream/StreamTableEnvironmentTest.scala    | 126 ++++++++++++++++++++-
 .../resources/testExecuteSqlWithExplainInsert0.out |  31 +++++
 .../resources/testExecuteSqlWithExplainInsert1.out |  36 ++++++
 .../resources/testExecuteSqlWithExplainSelect0.out |  21 ++++
 .../resources/testExecuteSqlWithExplainSelect1.out |  27 +++++
 21 files changed, 744 insertions(+), 57 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 342ee55..1ca045b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -72,6 +72,7 @@ import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
@@ -852,6 +853,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 			return buildShowResult(listFunctions());
 		} else if (operation instanceof ShowViewsOperation) {
 			return buildShowResult(listViews());
+		} else if (operation instanceof ExplainOperation) {
+			String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false);
+			return TableResultImpl.builder()
+					.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+					.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
+					.data(Collections.singletonList(Row.of(explanation)))
+					.build();
+
 		} else {
 			throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
 		}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
new file mode 100644
index 0000000..c78c78f
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import java.util.Collections;
+
+/**
+ * Operation to describe an EXPLAIN statement.
+ * NOTES: currently, only default behavior(EXPLAIN PLAN FOR xx) is supported.
+ */
+public class ExplainOperation implements Operation {
+	private final Operation child;
+
+	public ExplainOperation(Operation child) {
+		this.child = child;
+	}
+
+	public Operation getChild() {
+		return child;
+	}
+
+	@Override
+	public String asSummaryString() {
+		return OperationUtils.formatWithChildren(
+				"EXPLAIN PLAN FOR",
+				Collections.emptyMap(),
+				Collections.singletonList(child),
+				Operation::asSummaryString);
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index b303570..9f80aa1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -62,6 +62,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
 import org.apache.flink.table.operations.ShowDatabasesOperation;
@@ -98,6 +99,9 @@ import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
@@ -195,6 +199,8 @@ public class SqlToOperationConverter {
 			return Optional.of(converter.convertDropView((SqlDropView) validated));
 		} else if (validated instanceof SqlShowViews) {
 			return Optional.of(converter.convertShowViews((SqlShowViews) validated));
+		} else if (validated instanceof SqlExplain) {
+			return Optional.of(converter.convertExplain((SqlExplain) validated));
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return Optional.of(converter.convertSqlQuery(validated));
 		} else {
@@ -577,6 +583,19 @@ public class SqlToOperationConverter {
 		return new ShowViewsOperation();
 	}
 
+	/** Convert EXPLAIN statement. */
+	private Operation convertExplain(SqlExplain sqlExplain) {
+		Operation operation = convertSqlQuery(sqlExplain.getExplicandum());
+
+		if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES ||
+				sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL ||
+				sqlExplain.getFormat() != SqlExplainFormat.TEXT) {
+			throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx");
+		}
+
+		return new ExplainOperation(operation);
+	}
+
 	/** Fallback method for sql query. */
 	private Operation convertSqlQuery(SqlNode node) {
 		return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index c3f72b3..846f536 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -30,7 +30,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
-import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
 import java.lang.{Boolean => JBoolean}
@@ -129,7 +129,14 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowViews]) {
         return sqlNode
       }
-      validator.validate(sqlNode)
+      sqlNode match {
+        case explain: SqlExplain =>
+          val validated = validator.validate(explain.getExplicandum)
+          explain.setOperand(0, validated)
+          explain
+        case _ =>
+          validator.validate(sqlNode)
+      }
     }
     catch {
       case e: RuntimeException =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 2626809..9161753 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.planner.operations.PlannerQueryOperation
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
@@ -32,6 +33,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp
 import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
 
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
+import org.apache.calcite.rel.logical.LogicalTableModify
 import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
 import org.apache.calcite.sql.SqlExplainLevel
 
@@ -80,7 +82,21 @@ class BatchPlanner(
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
-        getRelBuilder.queryOperation(queryOperation).build()
+        val relNode = getRelBuilder.queryOperation(queryOperation).build()
+        relNode match {
+          // SQL: explain plan for insert into xx
+          case modify: LogicalTableModify =>
+            // convert LogicalTableModify to CatalogSinkModifyOperation
+            val qualifiedName = modify.getTable.getQualifiedName
+            require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+            val modifyOperation = new CatalogSinkModifyOperation(
+              ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+              new PlannerQueryOperation(modify.getInput)
+            )
+            translateToRel(modifyOperation)
+          case _ =>
+            relNode
+        }
       case modifyOperation: ModifyOperation =>
         translateToRel(modifyOperation)
       case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index e6245f2..7006533 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.planner.operations.PlannerQueryOperation
 import org.apache.flink.table.planner.plan.`trait`._
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer}
@@ -30,6 +31,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp
 import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
 
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
+import org.apache.calcite.rel.logical.LogicalTableModify
 import org.apache.calcite.sql.SqlExplainLevel
 
 import java.util
@@ -71,7 +73,21 @@ class StreamPlanner(
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
-        getRelBuilder.queryOperation(queryOperation).build()
+        val relNode = getRelBuilder.queryOperation(queryOperation).build()
+        relNode match {
+          // SQL: explain plan for insert into xx
+          case modify: LogicalTableModify =>
+            // convert LogicalTableModify to CatalogSinkModifyOperation
+            val qualifiedName = modify.getTable.getQualifiedName
+            require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+            val modifyOperation = new CatalogSinkModifyOperation(
+              ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+              new PlannerQueryOperation(modify.getInput)
+            )
+            translateToRel(modifyOperation)
+          case _ =>
+            relNode
+        }
       case modifyOperation: ModifyOperation =>
         translateToRel(modifyOperation)
       case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
new file mode 100644
index 0000000..0e5e015
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- LogicalProject(d=[$0], e=[$1])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : Source: Custom Source
+
+	 : Operator
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(select=[a, b], where=[(a > 10)])
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : SinkConversionToRow
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: Unnamed
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
new file mode 100644
index 0000000..4865193
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : Source: Custom Source
+
+	 : Operator
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(select=[a, b, c], where=[(a > 10)])
+			ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index e289edb..a27b47a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.api
 
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.sql.SqlExplainLevel
 import org.apache.flink.api.common.typeinfo.Types.STRING
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
@@ -29,10 +27,14 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
 import org.apache.flink.table.planner.operations.SqlConversionException
 import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF
 import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction
+import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
 import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks}
 import org.apache.flink.types.Row
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.sql.SqlExplainLevel
 import org.hamcrest.Matchers.containsString
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{Rule, Test}
 
@@ -797,6 +799,113 @@ class TableEnvironmentTest {
       tableResult5.collect())
   }
 
+  @Test
+  def testExecuteSqlWithExplainSelect(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val tableResult2 = tableEnv.executeSql("explain plan for select * from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    val it = tableResult2.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithExplainInsert(): Unit = {
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val tableResult3 = tableEnv.executeSql(
+      "explain plan for insert into MySink select a, b from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    val it = tableResult3.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithUnsupportedExplain(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    // TODO we can support them later
+    testUnsupportedExplain("explain plan excluding attributes for select * from MyTable")
+    testUnsupportedExplain("explain plan including all attributes for select * from MyTable")
+    testUnsupportedExplain("explain plan with type for select * from MyTable")
+    testUnsupportedExplain("explain plan without implementation for select * from MyTable")
+    testUnsupportedExplain("explain plan as xml for select * from MyTable")
+    testUnsupportedExplain("explain plan as json for select * from MyTable")
+  }
+
+  private def testUnsupportedExplain(explain: String): Unit = {
+    try {
+      tableEnv.executeSql(explain)
+      fail("This should not happen")
+    } catch {
+      case e: TableException =>
+        assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+      case e =>
+        fail("This should not happen, " + e.getMessage)
+    }
+  }
+
   private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
       assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 8ebb47e..ded0d85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -61,6 +61,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -91,6 +92,9 @@ import org.apache.flink.util.StringUtils;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
@@ -187,6 +191,8 @@ public class SqlToOperationConverter {
 			return Optional.of(converter.convertDropView((SqlDropView) validated));
 		} else if (validated instanceof SqlShowViews) {
 			return Optional.of(converter.convertShowViews((SqlShowViews) validated));
+		} else if (validated instanceof SqlExplain) {
+			return Optional.of(converter.convertExplain((SqlExplain) validated));
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return Optional.of(converter.convertSqlQuery(validated));
 		} else {
@@ -534,6 +540,19 @@ public class SqlToOperationConverter {
 		return new ShowViewsOperation();
 	}
 
+	/** Convert EXPLAIN statement. */
+	private Operation convertExplain(SqlExplain sqlExplain) {
+		Operation operation = convertSqlQuery(sqlExplain.getExplicandum());
+
+		if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES ||
+				sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL ||
+				sqlExplain.getFormat() != SqlExplainFormat.TEXT) {
+			throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx");
+		}
+
+		return new ExplainOperation(operation);
+	}
+
 	/**
 	 * Create a table schema from {@link SqlCreateTable}. This schema contains computed column
 	 * fields, say, we have a create table DDL statement:
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index e25b8e4..efc38a5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -31,14 +31,14 @@ import org.apache.flink.configuration.DeploymentOptions
 import org.apache.flink.core.execution.{DetachedJobExecutionResult, JobClient}
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory}
-import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager}
+import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager, ObjectIdentifier}
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
 import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
 import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, PlannerQueryOperation, QueryOperation}
 import org.apache.flink.table.plan.BatchOptimizer
 import org.apache.flink.table.plan.nodes.LogicalSink
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
@@ -57,6 +57,7 @@ import org.apache.flink.util.Preconditions.checkNotNull
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalTableModify
 
 import _root_.java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}
 
@@ -225,11 +226,25 @@ abstract class BatchTableEnvImpl(
     explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
   }
 
-  private def explain(operations: JList[Operation], extended: Boolean): String = {
+   protected def explain(operations: JList[Operation], extended: Boolean): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astList = operations.asScala.map {
       case queryOperation: QueryOperation =>
-        getRelBuilder.tableOperation(queryOperation).build()
+        val relNode = getRelBuilder.tableOperation(queryOperation).build()
+        relNode match {
+          // SQL: explain plan for insert into xx
+          case modify: LogicalTableModify =>
+            // convert LogicalTableModify to CatalogSinkModifyOperation
+            val qualifiedName = modify.getTable.getQualifiedName
+            require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+            val modifyOperation = new CatalogSinkModifyOperation(
+              ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+              new PlannerQueryOperation(modify.getInput)
+            )
+            translateToRel(modifyOperation, addLogicalSink = true)
+          case _ =>
+            relNode
+        }
       case modifyOperation: ModifyOperation =>
         translateToRel(modifyOperation, addLogicalSink = true)
       case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}")
@@ -237,27 +252,33 @@ abstract class BatchTableEnvImpl(
 
     val optimizedNodes = astList.map(optimizer.optimize)
 
-    val batchTableEnv = createDummyBatchTableEnv()
-    val dataSinks = optimizedNodes.zip(operations.asScala).map {
-      case (optimizedNode, operation) =>
-        operation match {
-          case queryOperation: QueryOperation =>
-            val dataSet = translate[Row](
-              optimizedNode,
-              getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))(
-              new GenericTypeInfo(classOf[Row]))
-            dataSet.output(new DiscardingOutputFormat[Row])
-          case modifyOperation: ModifyOperation =>
-            val tableSink = getTableSink(modifyOperation)
-            translate(
-              batchTableEnv,
-              optimizedNode,
-              tableSink,
-              getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
-          case o =>
-            throw new TableException("Unsupported Operation: " + o.asSummaryString())
-        }
-    }
+     val batchTableEnv = createDummyBatchTableEnv()
+     val dataSinks = optimizedNodes.zip(operations.asScala).map {
+       case (optimizedNode, operation) =>
+         operation match {
+           case queryOperation: QueryOperation =>
+             val fieldNames = queryOperation match {
+               case o: PlannerQueryOperation if o.getCalciteTree.isInstanceOf[LogicalTableModify] =>
+                 o.getCalciteTree.getInput(0).getRowType.getFieldNames.asScala.toArray[String]
+               case _ =>
+                 queryOperation.getTableSchema.getFieldNames
+             }
+             val dataSet = translate[Row](
+               optimizedNode,
+               getTableSchema(fieldNames, optimizedNode))(
+               new GenericTypeInfo(classOf[Row]))
+             dataSet.output(new DiscardingOutputFormat[Row])
+           case modifyOperation: ModifyOperation =>
+             val tableSink = getTableSink(modifyOperation)
+             translate(
+               batchTableEnv,
+               optimizedNode,
+               tableSink,
+               getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+           case o =>
+             throw new TableException("Unsupported Operation: " + o.asSummaryString())
+         }
+     }
 
     val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator)
     val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3a97106..7c6f144 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -545,8 +545,6 @@ abstract class TableEnvImpl(
 
   override def listFunctions(): Array[String] = functionCatalog.getFunctions
 
-  override def explain(table: Table): String
-
   override def getCompletionHints(statement: String, position: Int): Array[String] = {
     val planner = getFlinkPlanner
     planner.getCompletionHints(statement, position)
@@ -763,6 +761,15 @@ abstract class TableEnvImpl(
         TableResultImpl.TABLE_RESULT_OK
       case _: ShowViewsOperation =>
         buildShowResult(listViews())
+      case explainOperation: ExplainOperation =>
+        val explanation = explain(
+          JCollections.singletonList(explainOperation.getChild),
+          extended = false)
+        TableResultImpl.builder.
+          resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+          .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
+          .data(JCollections.singletonList(Row.of(explanation)))
+          .build
       case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
     }
   }
@@ -1128,6 +1135,8 @@ abstract class TableEnvImpl(
     }
   }
 
+  protected def explain(operations: JList[Operation], extended: Boolean): String
+
   override def fromValues(values: Expression*): Table = {
     createTable(operationTreeBuilder.values(values: _*))
   }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index b4c6210..3d7dae4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.rel.RelRoot
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.RexBuilder
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
-import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
 import _root_.java.lang.{Boolean => JBoolean}
@@ -127,7 +127,14 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowViews]) {
         return sqlNode
       }
-      validator.validate(sqlNode)
+      sqlNode match {
+        case explain: SqlExplain =>
+          val validated = validator.validate(explain.getExplicandum)
+          explain.setOperand(0, validated)
+          explain
+        case _ =>
+          validator.validate(sqlNode)
+      }
     }
     catch {
       case e: RuntimeException =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index f549168..756d9ca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -43,6 +43,7 @@ import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalTableModify
 
 import _root_.java.util
 import _root_.java.util.Objects
@@ -123,7 +124,21 @@ class StreamPlanner(
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astWithUpdatesAsRetractionTuples = operations.asScala.map {
       case queryOperation: QueryOperation =>
-        (getRelBuilder.tableOperation(queryOperation).build(), false)
+        val relNode = getRelBuilder.tableOperation(queryOperation).build()
+        relNode match {
+          // SQL: explain plan for insert into xx
+          case modify: LogicalTableModify =>
+            // convert LogicalTableModify to CatalogSinkModifyOperation
+            val qualifiedName = modify.getTable.getQualifiedName
+            require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+            val modifyOperation = new CatalogSinkModifyOperation(
+              ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+              new PlannerQueryOperation(modify.getInput)
+            )
+            translateToRel(modifyOperation)
+          case _ =>
+            (relNode, false)
+        }
       case modifyOperation: ModifyOperation =>
         translateToRel(modifyOperation)
       case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index c5e90bf..dd83a3e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -24,13 +24,14 @@ import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecutionEnvironment}
-import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData, readFromResource, replaceStageId}
+import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData}
 import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.api.java.StreamTableEnvironment
 import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnvironment, _}
 import org.apache.flink.table.runtime.utils.StreamITCase
 import org.apache.flink.table.sinks.CsvTableSink
 import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
 import org.apache.flink.table.utils.TestingOverwritableTableSink
 import org.apache.flink.types.Row
 import org.apache.flink.util.FileUtils
@@ -46,7 +47,6 @@ import _root_.java.lang.{Long => JLong}
 import _root_.java.util
 
 import _root_.scala.collection.mutable
-import _root_.scala.io.Source
 
 
 @RunWith(classOf[Parameterized])
@@ -452,15 +452,6 @@ object TableEnvironmentITCase {
     )
   }
 
-  def readFromResource(file: String): String = {
-    val source = s"${getClass.getResource("/").getFile}../../src/test/scala/resources/$file"
-    Source.fromFile(source).mkString
-  }
-
-  def replaceStageId(s: String): String = {
-    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
-  }
-
   def getPersonCsvTableSource: CsvTableSource = {
     val header = "First#Id#Score#Last"
     val csvRecords = getPersonData.map {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index 9823e08..d09bd5d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -25,10 +25,11 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
 import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF}
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
 import org.apache.flink.types.Row
 
 import org.hamcrest.Matchers.containsString
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.Test
 
 import java.util
@@ -341,6 +342,123 @@ class BatchTableEnvironmentTest extends TableTestBase {
       tableResult4.collect())
   }
 
+  @Test
+  def testExecuteSqlWithExplainSelect(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val tableResult2 = util.tableEnv.executeSql(
+      "explain plan for select * from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    val it = tableResult2.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = readFromResource("testExecuteSqlWithExplainSelect1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithExplainInsert(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val tableResult3 = util.tableEnv.executeSql(
+      "explain plan for insert into MySink select a, b from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    val it = tableResult3.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = readFromResource("testExecuteSqlWithExplainInsert1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithUnsupportedExplain(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    // TODO we can support them later
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan excluding attributes for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan including all attributes for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan with type for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan without implementation for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan as xml for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan as json for select * from MyTable")
+  }
+
+  private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
+    try {
+      tableEnv.executeSql(explain)
+      fail("This should not happen")
+    } catch {
+      case e: TableException =>
+        assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+      case e =>
+        fail("This should not happen, " + e.getMessage)
+    }
+  }
+
   private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
       assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 809d0aa..439fadb 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -29,20 +29,21 @@ import org.apache.flink.table.api.Expressions.$
 import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => JStreamTableEnvironmentImpl}
 import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{EnvironmentSettings, Expressions, TableConfig, Types, ValidationException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.api.{EnvironmentSettings, ResultKind, TableConfig, TableException, Types, ValidationException}
+import org.apache.flink.table.catalog.FunctionCatalog
 import org.apache.flink.table.executor.StreamExecutor
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.TableTestUtil.{binaryNode, readFromResource, replaceStageId, streamTableNode, term, unaryNode}
 import org.apache.flink.table.utils.{CatalogManagerMocks, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
 import org.apache.flink.types.Row
 
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.Test
 import org.mockito.Mockito.{mock, when}
 
 import java.lang.{Integer => JInt, Long => JLong}
-import org.apache.flink.table.module.ModuleManager
 
 class StreamTableEnvironmentTest extends TableTestBase {
 
@@ -208,6 +209,123 @@ class StreamTableEnvironmentTest extends TableTestBase {
     jTEnv.fromDataStream(ds, $("rt").rowtime(), $("b"), $("c"), $("d"), $("e"), $("pt").proctime())
   }
 
+  @Test
+  def testExecuteSqlWithExplainSelect(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val tableResult2 = util.tableEnv.executeSql(
+      "explain plan for select * from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    val it = tableResult2.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = readFromResource("testExecuteSqlWithExplainSelect0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithExplainInsert(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val tableResult3 = util.tableEnv.executeSql(
+      "explain plan for insert into MySink select a, b from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    val it = tableResult3.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = readFromResource("testExecuteSqlWithExplainInsert0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithUnsupportedExplain(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    // TODO we can support them later
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan excluding attributes for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan including all attributes for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan with type for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan without implementation for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan as xml for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan as json for select * from MyTable")
+  }
+
+  private def testUnsupportedExplain(tableEnv: StreamTableEnvironment, explain: String): Unit = {
+    try {
+      tableEnv.executeSql(explain)
+      fail("This should not happen")
+    } catch {
+      case e: TableException =>
+        assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+      case e =>
+        fail("This should not happen, " + e.getMessage)
+    }
+  }
+
   private def prepareSchemaExpressionParser:
     (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
new file mode 100644
index 0000000..aad6387
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  LogicalProject(d=[$0], e=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  DataStreamCalc(select=[a, b], where=[>(a, 10)])
+    StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : from: (a, b)
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : where: (>(a, 10)), select: (a, b)
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : to: Row
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: Unnamed
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
new file mode 100644
index 0000000..98206ae
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
@@ -0,0 +1,36 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+  LogicalProject(d=[$0], e=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+  DataSetCalc(select=[a, b], where=[>(a, 10)])
+    BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : from: (a, b)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : FlatMap
+			content : where: (>(a, 10)), select: (a, b)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			driver_strategy : FlatMap
+			Partitioning : RANDOM_PARTITIONED
+
+			 : Data Sink
+				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+  StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : Map
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : where: (>(a, 10)), select: (a, b, c)
+			ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+  BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : FlatMap
+		content : where: (>(a, 10)), select: (a, b, c)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : FlatMap
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : org.apache.flink.api.java.io.DiscardingOutputFormat
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+


[flink] 05/05: [FLINK-17267] [table] Introduce Table#explain api

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4959ebfc349bb80f88582f9ef2d60d09f1140737
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Apr 24 23:04:33 2020 +0800

    [FLINK-17267] [table] Introduce Table#explain api
    
    This closes #11905
---
 flink-python/pyflink/table/explain_detail.py       |  4 +-
 flink-python/pyflink/table/table.py                | 14 ++++++
 flink-python/pyflink/table/table_environment.py    |  5 +--
 .../{explain_detail.py => tests/test_explain.py}   | 30 ++++++++-----
 .../table/tests/test_table_environment_api.py      |  2 +-
 flink-python/pyflink/util/utils.py                 |  4 +-
 .../org/apache/flink/table/api/ExplainDetail.java  |  4 +-
 .../java/org/apache/flink/table/api/Table.java     | 10 +++++
 .../table/api/internal/TableEnvironmentImpl.java   |  8 +++-
 .../api/internal/TableEnvironmentInternal.java     | 14 ++++++
 .../apache/flink/table/api/internal/TableImpl.java |  6 +++
 .../flink/table/api/internal/TableResultImpl.java  | 51 +++++++++++++++++++---
 .../table/planner/delegation/StreamPlanner.scala   |  2 +-
 .../flink/table/api/TableEnvironmentTest.scala     | 24 +++++++++-
 .../table/api/internal/BatchTableEnvImpl.scala     |  8 ++--
 .../flink/table/api/internal/TableEnvImpl.scala    |  8 ++--
 .../api/batch/BatchTableEnvironmentTest.scala      | 22 ++++++++++
 .../api/stream/StreamTableEnvironmentTest.scala    | 22 ++++++++++
 18 files changed, 202 insertions(+), 36 deletions(-)

diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py
index 48e7ce9..0cbcbe9 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -29,6 +29,6 @@ class ExplainDetail(object):
     # 0.0 memory}
     ESTIMATED_COST = 0
 
-    # The changelog traits produced by a physical rel node.
+    # The changelog mode produced by a physical rel node.
     # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
-    CHANGELOG_TRAITS = 1
+    CHANGELOG_MODE = 1
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index 728d331..b74797e 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -21,6 +21,7 @@ from pyflink.java_gateway import get_gateway
 from pyflink.table.table_schema import TableSchema
 
 from pyflink.util.utils import to_jarray
+from pyflink.util.utils import to_j_explain_detail_arr
 
 __all__ = ['Table', 'GroupedTable', 'GroupWindowedTable', 'OverWindowedTable', 'WindowGroupedTable']
 
@@ -718,6 +719,19 @@ class Table(object):
         """
         self._j_table.executeInsert(table_path, overwrite)
 
+    def explain(self, *extra_details):
+        """
+        Returns the AST of this table and the execution plan.
+
+        :param extra_details: The extra explain details which the explain result should include,
+                              e.g. estimated cost, changelog mode for streaming
+        :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
+        :return: The statement for which the AST and execution plan will be returned.
+        :rtype: str
+        """
+        j_extra_details = to_j_explain_detail_arr(extra_details)
+        return self._j_table.explain(j_extra_details)
+
     def __str__(self):
         return self._j_table.toString()
 
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 94ff785..91073d8 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -471,13 +471,12 @@ class TableEnvironment(object):
 
     def explain_sql(self, stmt, *extra_details):
         """
-        Returns the AST of the specified statement and the execution plan to compute
-        the result of the given statement.
+        Returns the AST of the specified statement and the execution plan.
 
         :param stmt: The statement for which the AST and execution plan will be returned.
         :type stmt: str
         :param extra_details: The extra explain details which the explain result should include,
-                              e.g. estimated cost, change log trait for streaming
+                              e.g. estimated cost, changelog mode for streaming
         :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
         :return: The statement for which the AST and execution plan will be returned.
         :rtype: str
diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/tests/test_explain.py
similarity index 58%
copy from flink-python/pyflink/table/explain_detail.py
copy to flink-python/pyflink/table/tests/test_explain.py
index 48e7ce9..1dccaba 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/tests/test_explain.py
@@ -16,19 +16,25 @@
 # limitations under the License.
 ################################################################################
 
-__all__ = ['ExplainDetail']
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+from pyflink.table.explain_detail import ExplainDetail
 
 
-class ExplainDetail(object):
-    """
-    ExplainDetail defines the types of details for explain result.
-    """
+class StreamTableExplainTests(PyFlinkStreamTableTestCase):
 
-    # The cost information on physical rel node estimated by optimizer.
-    # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network,
-    # 0.0 memory}
-    ESTIMATED_COST = 0
+    def test_explain(self):
+        t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
+        result = t.group_by("c").select("a.sum, c as b").explain(ExplainDetail.CHANGELOG_MODE)
 
-    # The changelog traits produced by a physical rel node.
-    # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
-    CHANGELOG_TRAITS = 1
+        assert isinstance(result, str)
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index bd279af..96987de 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -265,7 +265,7 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
             source_sink_utils.TestAppendSink(field_names, field_types))
 
         result = t_env.explain_sql(
-            "select a + 1, b, c from %s" % source, ExplainDetail.ESTIMATED_COST)
+            "select a + 1, b, c from %s" % source, ExplainDetail.CHANGELOG_MODE)
 
         assert isinstance(result, str)
 
diff --git a/flink-python/pyflink/util/utils.py b/flink-python/pyflink/util/utils.py
index 29a20da..065b537 100644
--- a/flink-python/pyflink/util/utils.py
+++ b/flink-python/pyflink/util/utils.py
@@ -134,8 +134,8 @@ def to_j_explain_detail_arr(p_extra_details):
     gateway = get_gateway()
 
     def to_j_explain_detail(p_extra_detail):
-        if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS:
-            return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS
+        if p_extra_detail == ExplainDetail.CHANGELOG_MODE:
+            return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
         else:
             return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 6e9d014..5dfddc3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -29,8 +29,8 @@ public enum ExplainDetail {
 	ESTIMATED_COST,
 
 	/**
-	 * The changelog traits produced by a physical rel node.
+	 * The changelog mode produced by a physical rel node.
 	 * e.g. GroupAggregate(..., changelogMode=[I,UA,D])
 	 */
-	CHANGELOG_TRAITS
+	CHANGELOG_MODE
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 3362ca5..d1239e6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1473,4 +1473,14 @@ public interface Table {
 	 * @return The insert operation execution result.
 	 */
 	TableResult executeInsert(String tablePath, boolean overwrite);
+
+	/**
+	 * Returns the AST of this table and the execution plan to compute
+	 * the result of this table.
+	 *
+	 * @param extraDetails The extra explain details which the explain result should include,
+	 *   e.g. estimated cost, change log trait for streaming
+	 * @return AST and the execution plan.
+	 */
+	String explain(ExplainDetail... extraDetails);
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 610627c..490e416 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -614,6 +614,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	}
 
 	@Override
+	public String explainInternal(List<Operation> operations, ExplainDetail... extraDetails) {
+		return planner.explain(operations, extraDetails);
+	}
+
+	@Override
 	public String[] getCompletionHints(String statement, int position) {
 		return planner.getCompletionHints(statement, position);
 	}
@@ -873,6 +878,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 					.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
 					.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
 					.data(Collections.singletonList(Row.of(explanation)))
+					.setPrintStyle(TableResultImpl.PrintStyle.RAW_CONTENT)
 					.build();
 
 		} else {
@@ -997,7 +1003,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	protected ExplainDetail[] getExplainDetails(boolean extended) {
 		if (extended) {
 			if (isStreamingMode) {
-				return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS };
+				return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE };
 			} else {
 				return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST };
 			}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
index 2319538..228713c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
@@ -19,11 +19,13 @@
 package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
 
 import java.util.List;
 
@@ -56,4 +58,16 @@ interface TableEnvironmentInternal extends TableEnvironment {
 	 * @return the affected row counts (-1 means unknown).
 	 */
 	TableResult executeInternal(List<ModifyOperation> operations);
+
+	/**
+	 * Returns the AST of this table and the execution plan to compute
+	 * the result of this table.
+	 *
+	 * @param operations The operations to be explained.
+	 * @param extraDetails The extra explain details which the explain result should include,
+	 *   e.g. estimated cost, changelog mode for streaming
+	 * @return AST and the execution plan.
+	 */
+	String explainInternal(List<Operation> operations, ExplainDetail... extraDetails);
+
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 9b2e319..d3a63aa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.AggregatedTable;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.FlatAggregateTable;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.GroupWindowedTable;
@@ -565,6 +566,11 @@ public class TableImpl implements Table {
 	}
 
 	@Override
+	public String explain(ExplainDetail... extraDetails) {
+		return tableEnvironment.explainInternal(Collections.singletonList(getQueryOperation()), extraDetails);
+	}
+
+	@Override
 	public String toString() {
 		if (tableName == null) {
 			tableName = "UnnamedTable$" + uniqueId.getAndIncrement();
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
index 791ee89..a783976 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.utils.PrintUtils;
@@ -51,16 +52,19 @@ class TableResultImpl implements TableResult {
 	private final TableSchema tableSchema;
 	private final ResultKind resultKind;
 	private final Iterator<Row> data;
+	private final PrintStyle printStyle;
 
 	private TableResultImpl(
 			@Nullable JobClient jobClient,
 			TableSchema tableSchema,
 			ResultKind resultKind,
-			Iterator<Row> data) {
+			Iterator<Row> data,
+			PrintStyle printStyle) {
 		this.jobClient = jobClient;
 		this.tableSchema = Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
 		this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
 		this.data = Preconditions.checkNotNull(data, "data should not be null");
+		this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
 	}
 
 	@Override
@@ -86,7 +90,18 @@ class TableResultImpl implements TableResult {
 	@Override
 	public void print() {
 		Iterator<Row> it = collect();
-		PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out));
+		switch (printStyle) {
+			case TABLEAU:
+				PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out));
+				break;
+			case RAW_CONTENT:
+				while (it.hasNext()) {
+					System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
+				}
+				break;
+			default:
+				throw new TableException("Unsupported print style: " + printStyle);
+		}
 	}
 
 	public static Builder builder() {
@@ -101,6 +116,7 @@ class TableResultImpl implements TableResult {
 		private TableSchema tableSchema = null;
 		private ResultKind resultKind = null;
 		private Iterator<Row> data = null;
+		private PrintStyle printStyle = PrintStyle.TABLEAU;
 
 		private Builder() {
 		}
@@ -138,7 +154,7 @@ class TableResultImpl implements TableResult {
 		}
 
 		/**
-		 * Specifies an row iterator as the execution result .
+		 * Specifies an row iterator as the execution result.
 		 *
 		 * @param rowIterator a row iterator as the execution result.
 		 */
@@ -149,7 +165,7 @@ class TableResultImpl implements TableResult {
 		}
 
 		/**
-		 * Specifies an row list as the execution result .
+		 * Specifies an row list as the execution result.
 		 *
 		 * @param rowList a row list as the execution result.
 		 */
@@ -160,11 +176,36 @@ class TableResultImpl implements TableResult {
 		}
 
 		/**
+		 * Specifies print style. Default is {@link PrintStyle#TABLEAU}.
+		 */
+		public Builder setPrintStyle(PrintStyle printStyle) {
+			Preconditions.checkNotNull(printStyle, "printStyle should not be null");
+			this.printStyle = printStyle;
+			return this;
+		}
+
+		/**
 		 * Returns a {@link TableResult} instance.
 		 */
 		public TableResult build() {
-			return new TableResultImpl(jobClient, tableSchema, resultKind, data);
+			return new TableResultImpl(jobClient, tableSchema, resultKind, data, printStyle);
 		}
 	}
 
+	/**
+	 * PrintStyle defines the styles of printing.
+	 */
+	public enum PrintStyle {
+		/**
+		 * print the result schema and content as tableau form.
+		 */
+		TABLEAU,
+
+		/**
+		 * only print the result content as raw form.
+		 * column delimiter is ",", row delimiter is "\n".
+		 */
+		RAW_CONTENT
+	}
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 959de06..5dc6a6f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -114,7 +114,7 @@ class StreamPlanner(
     } else {
       SqlExplainLevel.DIGEST_ATTRIBUTES
     }
-    val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS)
+    val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_MODE)
     sb.append(ExecNodePlanDumper.dagToString(
       execNodes,
       explainLevel,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 0e197ba..aa6bd2e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -911,7 +911,7 @@ class TableEnvironmentTest {
     assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
 
     val actual = tableEnv.explainSql(
-      "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS)
+      "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_MODE)
     val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
     assertEquals(replaceStageId(expected), replaceStageId(actual))
   }
@@ -951,6 +951,28 @@ class TableEnvironmentTest {
     assertEquals(replaceStageId(expected), replaceStageId(actual))
   }
 
+  @Test
+  def testTableExplain(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = tableEnv.sqlQuery("select * from MyTable where a > 10")
+      .explain(ExplainDetail.CHANGELOG_MODE)
+    val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(explain: String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index b3caf20..7ba116d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -217,7 +217,7 @@ abstract class BatchTableEnvImpl(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-    explain(
+    explainInternal(
       JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
       getExplainDetails(extended): _*)
   }
@@ -225,12 +225,14 @@ abstract class BatchTableEnvImpl(
   override def explain(table: Table): String = explain(table: Table, extended = false)
 
   override def explain(extended: Boolean): String = {
-    explain(
+    explainInternal(
       bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava,
       getExplainDetails(extended): _*)
   }
 
-  protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String = {
+  protected override def explainInternal(
+      operations: JList[Operation],
+      extraDetails: ExplainDetail*): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astList = operations.asScala.map {
       case queryOperation: QueryOperation =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 1f01186..4c6cbd4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.operators.DataSink
 import org.apache.flink.core.execution.JobClient
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.internal.TableResultImpl.PrintStyle
 import org.apache.flink.table.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.catalog.exceptions.{TableNotExistException => _, _}
@@ -762,11 +763,12 @@ abstract class TableEnvImpl(
       case _: ShowViewsOperation =>
         buildShowResult(listViews())
       case explainOperation: ExplainOperation =>
-        val explanation = explain(JCollections.singletonList(explainOperation.getChild))
+        val explanation = explainInternal(JCollections.singletonList(explainOperation.getChild))
         TableResultImpl.builder.
           resultKind(ResultKind.SUCCESS_WITH_CONTENT)
           .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
           .data(JCollections.singletonList(Row.of(explanation)))
+          .setPrintStyle(PrintStyle.RAW_CONTENT)
           .build
 
       case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
@@ -1142,10 +1144,10 @@ abstract class TableEnvImpl(
         "Unsupported SQL query! explainSql() only accepts a single SQL query.")
     }
 
-    explain(operations, extraDetails: _*)
+    explainInternal(operations, extraDetails: _*)
   }
 
-  protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String
+  protected def explainInternal(operations: JList[Operation], extraDetails: ExplainDetail*): String
 
   override def fromValues(values: Expression*): Table = {
     createTable(operationTreeBuilder.values(values: _*))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index c928314..74d820e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -504,6 +504,28 @@ class BatchTableEnvironmentTest extends TableTestBase {
     assertEquals(replaceStageId(expected), replaceStageId(actual))
   }
 
+  @Test
+  def testTableExplain(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.sqlQuery("select * from MyTable where a > 10").explain()
+    val expected = readFromResource("testExplainSqlWithSelect1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 25bb536..bb710c6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -384,6 +384,28 @@ class StreamTableEnvironmentTest extends TableTestBase {
     assertEquals(replaceStageId(expected), replaceStageId(actual))
   }
 
+  @Test
+  def testTableExplain(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.sqlQuery("select * from MyTable where a > 10").explain()
+    val expected = readFromResource("testExplainSqlWithSelect0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def prepareSchemaExpressionParser:
     (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
 


[flink] 02/05: [FLINK-17267] [table-planner] legacy batch planner supports explain insert operation

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2fc82ad5b92367c9f9a2aad5e6df3fd78108754e
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Apr 24 20:45:12 2020 +0800

    [FLINK-17267] [table-planner] legacy batch planner supports explain insert operation
---
 .../table/tests/test_table_environment_api.py      |   5 +-
 .../table/api/internal/BatchTableEnvImpl.scala     | 191 +++++++++++++++++----
 .../flink/table/api/internal/TableEnvImpl.scala    |  63 ++++++-
 .../table/plan/nodes/dataset/DataSetSink.scala     |  57 ++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala     |   3 +-
 .../table/plan/rules/dataSet/DataSetSinkRule.scala |  55 ++++++
 .../apache/flink/table/api/batch/ExplainTest.scala |  71 ++++++--
 .../sql/validation/InsertIntoValidationTest.scala  |   8 +
 .../validation/InsertIntoValidationTest.scala      |   4 +
 .../src/test/scala/resources/testInsert1.out       |  27 +++
 .../test/scala/resources/testMultipleInserts1.out  |  51 ++++++
 11 files changed, 477 insertions(+), 58 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index c1bf04a..87c8023 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -400,8 +400,9 @@ class BatchTableEnvironmentTests(TableEnvironmentTest, PyFlinkBatchTableTestCase
         t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
         t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
 
-        with self.assertRaises(TableException):
-            t_env.explain(extended=True)
+        actual = t_env.explain(extended=True)
+
+        assert isinstance(actual, str)
 
     def test_create_table_environment(self):
         table_config = TableConfig()
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 343fc23..e25b8e4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -38,8 +38,9 @@ import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
 import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
 import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{DataSetQueryOperation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation}
 import org.apache.flink.table.plan.BatchOptimizer
+import org.apache.flink.table.plan.nodes.LogicalSink
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.planner.Conversions
 import org.apache.flink.table.runtime.MapRunner
@@ -74,7 +75,7 @@ abstract class BatchTableEnvImpl(
     moduleManager: ModuleManager)
   extends TableEnvImpl(config, catalogManager, moduleManager) {
 
-  private val bufferedSinks = new JArrayList[DataSink[_]]
+  private val bufferedModifyOperations = new JArrayList[ModifyOperation]()
 
   private[flink] val optimizer = new BatchOptimizer(
     () => config.getPlannerConfig.unwrap(classOf[CalciteConfig]).orElse(CalciteConfig.DEFAULT),
@@ -170,8 +171,8 @@ abstract class BatchTableEnvImpl(
     }
   }
 
-  override protected def addToBuffer(sink: DataSink[_]): Unit = {
-    bufferedSinks.add(sink)
+  override protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit = {
+    bufferedModifyOperations.add(modifyOperation)
   }
 
   /**
@@ -215,32 +216,69 @@ abstract class BatchTableEnvImpl(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-    val ast = getRelBuilder.tableOperation(table.getQueryOperation).build()
-    val optimizedPlan = optimizer.optimize(ast)
-    val dataSet = translate[Row](
-      optimizedPlan,
-      getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan))(
-      new GenericTypeInfo(classOf[Row]))
-    dataSet.output(new DiscardingOutputFormat[Row])
-    val env = dataSet.getExecutionEnvironment
+    explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended)
+  }
+
+  override def explain(table: Table): String = explain(table: Table, extended = false)
+
+  override def explain(extended: Boolean): String = {
+    explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
+  }
+
+  private def explain(operations: JList[Operation], extended: Boolean): String = {
+    require(operations.asScala.nonEmpty, "operations should not be empty")
+    val astList = operations.asScala.map {
+      case queryOperation: QueryOperation =>
+        getRelBuilder.tableOperation(queryOperation).build()
+      case modifyOperation: ModifyOperation =>
+        translateToRel(modifyOperation, addLogicalSink = true)
+      case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}")
+    }
+
+    val optimizedNodes = astList.map(optimizer.optimize)
+
+    val batchTableEnv = createDummyBatchTableEnv()
+    val dataSinks = optimizedNodes.zip(operations.asScala).map {
+      case (optimizedNode, operation) =>
+        operation match {
+          case queryOperation: QueryOperation =>
+            val dataSet = translate[Row](
+              optimizedNode,
+              getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))(
+              new GenericTypeInfo(classOf[Row]))
+            dataSet.output(new DiscardingOutputFormat[Row])
+          case modifyOperation: ModifyOperation =>
+            val tableSink = getTableSink(modifyOperation)
+            translate(
+              batchTableEnv,
+              optimizedNode,
+              tableSink,
+              getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+          case o =>
+            throw new TableException("Unsupported Operation: " + o.asSummaryString())
+        }
+    }
+
+    val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator)
+    val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
+
+    val env = dataSinks.head.getDataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
 
     s"== Abstract Syntax Tree ==" +
-        System.lineSeparator +
-        s"${RelOptUtil.toString(ast)}" +
-        System.lineSeparator +
-        s"== Optimized Logical Plan ==" +
-        System.lineSeparator +
-        s"${RelOptUtil.toString(optimizedPlan)}" +
-        System.lineSeparator +
-        s"== Physical Execution Plan ==" +
-        System.lineSeparator +
-        s"$sqlPlan"
+      System.lineSeparator +
+      s"$astPlan" +
+      System.lineSeparator +
+      s"== Optimized Logical Plan ==" +
+      System.lineSeparator +
+      s"$optimizedPlan" +
+      System.lineSeparator +
+      s"== Physical Execution Plan ==" +
+      System.lineSeparator +
+      s"$sqlPlan"
   }
 
-  override def explain(table: Table): String = explain(table: Table, extended = false)
-
   override def execute(jobName: String): JobExecutionResult = {
     val plan = createPipelineAndClearBuffer(jobName)
 
@@ -292,10 +330,6 @@ abstract class BatchTableEnvImpl(
     createPipelineAndClearBuffer(jobName)
   }
 
-  override def explain(extended: Boolean): String = {
-    throw new TableException("This method is unsupported in old planner.")
-  }
-
   /**
     * Translate the buffered sinks to Plan, and clear the buffer.
     *
@@ -304,10 +338,11 @@ abstract class BatchTableEnvImpl(
     * If the buffer is not clear after failure, the following `translate` will also fail.
     */
   private def createPipelineAndClearBuffer(jobName: String): Pipeline = {
+    val dataSinks = translate(bufferedModifyOperations)
     try {
-      createPipeline(bufferedSinks, jobName)
+      createPipeline(dataSinks, jobName)
     } finally {
-      bufferedSinks.clear()
+      bufferedModifyOperations.clear()
     }
   }
 
@@ -357,6 +392,102 @@ abstract class BatchTableEnvImpl(
   }
 
   /**
+    * Translates a [[ModifyOperation]] into a [[RelNode]].
+    *
+    * The transformation does not involve optimizing the relational expression tree.
+    *
+    * @param modifyOperation The root ModifyOperation of the relational expression tree.
+    * @param addLogicalSink Whether add [[LogicalSink]] as the root.
+    *                       Currently, LogicalSink only is only used for explaining.
+    * @return The [[RelNode]] that corresponds to the translated [[ModifyOperation]].
+    */
+  private def translateToRel(modifyOperation: ModifyOperation, addLogicalSink: Boolean): RelNode = {
+    val input = getRelBuilder.tableOperation(modifyOperation.getChild).build()
+    if (addLogicalSink) {
+      val tableSink = getTableSink(modifyOperation)
+      modifyOperation match {
+        case s: CatalogSinkModifyOperation =>
+          LogicalSink.create(input, tableSink, s.getTableIdentifier.toString)
+        case o =>
+          throw new TableException("Unsupported Operation: " + o.asSummaryString())
+      }
+    } else {
+      input
+    }
+  }
+
+  /**
+    * Translates a list of [[ModifyOperation]] into a list of [[DataSink]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+    *
+    * @param modifyOperations The root [[ModifyOperation]]s of the relational expression tree.
+    * @return The [[DataSink]] that corresponds to the translated [[ModifyOperation]]s.
+    */
+  private def translate[T](modifyOperations: JList[ModifyOperation]): JList[DataSink[_]] = {
+    val relNodes = modifyOperations.asScala.map(o => translateToRel(o, addLogicalSink = false))
+    val optimizedNodes = relNodes.map(optimizer.optimize)
+
+    val batchTableEnv = createDummyBatchTableEnv()
+    modifyOperations.asScala.zip(optimizedNodes).map {
+      case (modifyOperation, optimizedNode) =>
+        val tableSink = getTableSink(modifyOperation)
+        translate(
+          batchTableEnv,
+          optimizedNode,
+          tableSink,
+          getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+    }.asJava
+  }
+
+  /**
+    * Translates an optimized [[RelNode]] into a [[DataSet]]
+    * and handed over to the [[TableSink]] to write it.
+    *
+    * @param optimizedNode The [[RelNode]] to translate.
+    * @param tableSink The [[TableSink]] to write the [[Table]] to.
+    * @return The [[DataSink]] that corresponds to the [[RelNode]] and the [[TableSink]].
+    */
+  private def translate[T](
+      batchTableEnv: BatchTableEnvImpl,
+      optimizedNode: RelNode,
+      tableSink: TableSink[T],
+      tableSchema: TableSchema): DataSink[_] = {
+    tableSink match {
+      case batchSink: BatchTableSink[T] =>
+        val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType)
+          .asInstanceOf[TypeInformation[T]]
+        // translate the Table into a DataSet and provide the type that the TableSink expects.
+        val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType)
+        // create a dummy NoOpOperator, which holds dummy DummyExecutionEnvironment as context.
+        // NoOpOperator will be ignored in OperatorTranslation
+        // when translating DataSet to Operator, while its input can be translated normally.
+        val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
+        // Give the DataSet to the TableSink to emit it.
+        batchSink.consumeDataSet(dummyOp)
+      case boundedSink: OutputFormatTableSink[T] =>
+        val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType)
+          .asInstanceOf[TypeInformation[T]]
+        // translate the Table into a DataSet and provide the type that the TableSink expects.
+        val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType)
+        // create a dummy NoOpOperator, which holds DummyExecutionEnvironment as context.
+        // NoOpOperator will be ignored in OperatorTranslation
+        // when translating DataSet to Operator, while its input can be translated normally.
+        val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
+        // use the OutputFormat to consume the DataSet.
+        val dataSink = dummyOp.output(boundedSink.getOutputFormat)
+        dataSink.name(
+          TableConnectorUtils.generateRuntimeName(
+            boundedSink.getClass,
+            boundedSink.getTableSchema.getFieldNames))
+      case _ =>
+        throw new TableException(
+          "BatchTableSink or OutputFormatTableSink required to emit batch Table.")
+    }
+  }
+
+  /**
     * Translates a [[Table]] into a [[DataSet]].
     *
     * The transformation involves optimizing the relational expression tree as defined by
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index f40b75d..3a97106 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -37,7 +37,7 @@ import org.apache.flink.table.operations.ddl._
 import org.apache.flink.table.operations.utils.OperationTreeBuilder
 import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _}
 import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder}
-import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
+import org.apache.flink.table.sinks.{BatchTableSink, OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.util.JavaScalaConversionUtil
@@ -878,11 +878,11 @@ abstract class TableEnvImpl(
       tableSink: TableSink[T]): DataSink[_]
 
   /**
-    * Add the given [[DataSink]] into the buffer.
+    * Add the given [[ModifyOperation]] into the buffer.
     *
-    * @param dataSink The [[DataSink]] to add the buffer to.
+    * @param modifyOperation The [[ModifyOperation]] to add the buffer to.
     */
-  protected def addToBuffer(dataSink: DataSink[_]): Unit
+  protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit
 
   override def insertInto(path: String, table: Table): Unit = {
     val parser = planningConfigurationBuilder.createCalciteParser()
@@ -919,15 +919,64 @@ abstract class TableEnvImpl(
       table: Table,
       insertOptions: InsertOptions,
       sinkIdentifier: ObjectIdentifier): Unit = {
-    val dataSink = writeToSinkAndTranslate(table.getQueryOperation, insertOptions, sinkIdentifier)
-    addToBuffer(dataSink)
+    val operation = new CatalogSinkModifyOperation(
+      sinkIdentifier,
+      table.getQueryOperation,
+      insertOptions.staticPartitions,
+      insertOptions.overwrite,
+      new JHashMap[String, String]())
+    addToBuffer(operation)
   }
 
   override def getParser: Parser = parser
 
   override def getCatalogManager: CatalogManager = catalogManager
 
-  private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
+  protected def getTableSink(modifyOperation: ModifyOperation): TableSink[_] = {
+    modifyOperation match {
+      case s: CatalogSinkModifyOperation =>
+        getTableSink(s.getTableIdentifier) match {
+          case None =>
+            throw new TableException(
+              s"No table was registered under the name ${s.getTableIdentifier}.")
+
+          case Some(tableSink) =>
+            tableSink match {
+              case _: BatchTableSink[_] => // do nothing
+              case _: OutputFormatTableSink[_] => // do nothing
+              case _ =>
+                throw new TableException(
+                  "BatchTableSink or OutputFormatTableSink required to emit batch Table.")
+            }
+            // validate schema of source table and table sink
+            TableSinkUtils.validateSink(
+              s.getStaticPartitions,
+              s.getChild,
+              s.getTableIdentifier,
+              tableSink)
+            // set static partitions if it is a partitioned table sink
+            tableSink match {
+              case partitionableSink: PartitionableTableSink =>
+                partitionableSink.setStaticPartition(s.getStaticPartitions)
+              case _ =>
+            }
+            // set whether to overwrite if it's an OverwritableTableSink
+            tableSink match {
+              case overwritableTableSink: OverwritableTableSink =>
+                overwritableTableSink.setOverwrite(s.isOverwrite)
+              case _ =>
+                require(!s.isOverwrite, "INSERT OVERWRITE requires " +
+                  s"${classOf[OverwritableTableSink].getSimpleName} but actually got " +
+                  tableSink.getClass.getName)
+            }
+            tableSink
+        }
+      case o =>
+        throw new TableException("Unsupported Operation: " + o.asSummaryString())
+    }
+  }
+
+  protected def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
     JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
       .map(_.getTable) match {
       case Some(s) if s.isInstanceOf[ConnectorCatalogTable[_, _]] =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
new file mode 100644
index 0000000..f3fd194
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.DataSink
+import org.apache.flink.table.api.internal.BatchTableEnvImpl
+import org.apache.flink.table.plan.nodes.Sink
+import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
+import org.apache.flink.types.Row
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+/**
+  * A special [[DataSetRel]] which make explain result more pretty.
+  *
+  * <p>NOTES: We can't move the [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] logic
+  * from [[BatchTableEnvImpl]] to this node, because the return types of
+  * [[DataSetRel#translateToPlan]] (which returns [[DataSet]]) and
+  * [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] (which returns [[DataSink]]) are
+  * different. [[DataSetSink#translateToPlan]] just returns the input's translated result.
+  */
+class DataSetSink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    sink: TableSink[_],
+    sinkName: String)
+  extends Sink(cluster, traitSet, inputRel, sink, sinkName)
+    with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetSink(cluster, traitSet, inputs.get(0), sink, sinkName)
+  }
+
+  override def translateToPlan(tableEnv: BatchTableEnvImpl): DataSet[Row] = {
+    getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+  }
+
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a872fb4..50a1bca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -220,7 +220,8 @@ object FlinkRuleSets {
     DataSetValuesRule.INSTANCE,
     DataSetCorrelateRule.INSTANCE,
     DataSetPythonCorrelateRule.INSTANCE,
-    BatchTableSourceScanRule.INSTANCE
+    BatchTableSourceScanRule.INSTANCE,
+    DataSetSinkRule.INSTANCE
   )
 
   /**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
new file mode 100644
index 0000000..b7786b0
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.dataset.DataSetSink
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+class DataSetSinkRule
+  extends ConverterRule(
+    classOf[FlinkLogicalSink],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASET,
+    "DataSetSinkRule") {
+
+  def convert(rel: RelNode): RelNode = {
+    val sink: FlinkLogicalSink = rel.asInstanceOf[FlinkLogicalSink]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
+    val convInput: RelNode = RelOptRule.convert(sink.getInput(0), FlinkConventions.DATASET)
+
+    new DataSetSink(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      sink.sink,
+      sink.sinkName
+    )
+  }
+}
+
+object DataSetSinkRule {
+  val INSTANCE = new DataSetSinkRule
+}
+
+
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index a2df711..fc33f8d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -18,20 +18,22 @@
 
 package org.apache.flink.table.api.batch
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Table
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl
-import org.apache.flink.table.utils.TableTestUtil.batchTableNode
+import org.apache.flink.table.api.{Table, Types}
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, readFromResource, replaceStageId}
 import org.apache.flink.test.util.MultipleProgramsTestBase
+
 import org.junit.Assert.assertEquals
 import org.junit._
 
 class ExplainTest
   extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
 
-  private val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
-
   @Test
   def testFilterWithoutExtended(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -41,8 +43,7 @@ class ExplainTest
     val table = scan.filter($"a" % 2 === 0)
 
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter0.out").mkString
+    val source = readFromResource("testFilter0.out")
 
     val expected = replaceString(source, scan)
     assertEquals(expected, result)
@@ -58,8 +59,7 @@ class ExplainTest
 
     val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
       .explain(table, extended = true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter1.out").mkString
+    val source = readFromResource("testFilter1.out")
 
     val expected = replaceString(source, scan)
     assertEquals(expected, result)
@@ -75,8 +75,7 @@ class ExplainTest
     val table = table1.join(table2).where($"b" === $"d").select($"a", $"c")
 
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin0.out").mkString
+    val source = readFromResource("testJoin0.out")
 
     val expected = replaceString(source, table1, table2)
     assertEquals(expected, result)
@@ -93,8 +92,7 @@ class ExplainTest
 
     val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
       .explain(table, extended = true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin1.out").mkString
+    val source = readFromResource("testJoin1.out")
 
     val expected = replaceString(source, table1, table2)
     assertEquals(expected, result)
@@ -110,8 +108,7 @@ class ExplainTest
     val table = table1.unionAll(table2)
 
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion0.out").mkString
+    val source = readFromResource("testUnion0.out")
 
     val expected = replaceString(source, table1, table2)
     assertEquals(expected, result)
@@ -128,13 +125,51 @@ class ExplainTest
 
     val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
       .explain(table, extended = true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion1.out").mkString
+    val source = readFromResource("testUnion1.out")
 
     val expected = replaceString(source, table1, table2)
     assertEquals(expected, result)
   }
 
+  @Test
+  def testInsert(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = BatchTableEnvironment.create(env)
+
+    tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
+
+    tEnv.sqlUpdate("insert into targetTable select first, id from sourceTable")
+
+    val result = tEnv.explain(false)
+    val expected = readFromResource("testInsert1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(result))
+  }
+
+  @Test
+  def testMultipleInserts(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = BatchTableEnvironment.create(env)
+
+    tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable1", sink.configure(fieldNames, fieldTypes))
+    tEnv.registerTableSink("targetTable2", sink.configure(fieldNames, fieldTypes))
+
+    tEnv.sqlUpdate("insert into targetTable1 select first, id from sourceTable")
+    tEnv.sqlUpdate("insert into targetTable2 select last, id from sourceTable")
+
+    val result = tEnv.explain(false)
+    val expected = readFromResource("testMultipleInserts1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(result))
+  }
 
   def replaceString(s: String, t1: Table, t2: Table): String = {
     replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1)
@@ -144,14 +179,14 @@ class ExplainTest
     replaceSourceNode(replaceString(s), t, 0)
   }
 
-  private def replaceSourceNode(s: String, t: Table, idx: Int) = {
+  private def replaceSourceNode(s: String, t: Table, idx: Int): String = {
     s.replace(
       s"%logicalSourceNode$idx%", batchTableNode(t)
         .replace("DataSetScan", "FlinkLogicalDataSetScan"))
       .replace(s"%sourceNode$idx%", batchTableNode(t))
   }
 
-  def replaceString(s: String) = {
+  def replaceString(s: String): String = {
     s.replaceAll("\\r\\n", "\n")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index d8915df..ab07039 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase {
 
     // must fail because table sink schema has too few fields
     util.tableEnv.sqlUpdate(sql)
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 
   @Test(expected = classOf[ValidationException])
@@ -57,6 +59,8 @@ class InsertIntoValidationTest extends TableTestBase {
 
     // must fail because types of table sink do not match query result
     util.tableEnv.sqlUpdate(sql)
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 
   @Test(expected = classOf[ValidationException])
@@ -73,6 +77,8 @@ class InsertIntoValidationTest extends TableTestBase {
 
     // must fail because partial insert is not supported yet.
     util.tableEnv.sqlUpdate(sql)
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 
   @Test
@@ -92,5 +98,7 @@ class InsertIntoValidationTest extends TableTestBase {
     val sql = "INSERT INTO targetTable SELECT a, b FROM sourceTable"
 
     util.tableEnv.sqlUpdate(sql)
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
index 4ae77f9..210a598 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
@@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase {
     util.tableEnv.scan("sourceTable")
       .select('a, 'b, 'c)
       .insertInto("targetTable")
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 
   @Test(expected = classOf[ValidationException])
@@ -57,5 +59,7 @@ class InsertIntoValidationTest extends TableTestBase {
     util.tableEnv.scan("sourceTable")
       .select('a, 'b, 'c)
       .insertInto("targetTable")
+    // trigger validation
+    util.tableEnv.execute("test")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
new file mode 100644
index 0000000..9b78a10
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e])
+  BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : to: Row
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : UnsafeMemoryAppendTableSink(d, e)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
new file mode 100644
index 0000000..c8979bd
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
@@ -0,0 +1,51 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e])
+  LogicalProject(last=[$3], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e])
+  BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e])
+  BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[last, id], source=[CsvTableSource(read fields: last, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : to: Row
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : UnsafeMemoryAppendTableSink(d, e)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : to: Row
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : UnsafeMemoryAppendTableSink(d, e)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+


[flink] 01/05: [FLINK-17267] [table-planner] legacy stream planner supports explain insert operation

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 59ca355f05b668f1cb41cae02ce182ccc7c4d707
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Apr 24 15:22:09 2020 +0800

    [FLINK-17267] [table-planner] legacy stream planner supports explain insert operation
---
 .../table/calcite/RelTimeIndicatorConverter.scala  |  30 +++
 .../flink/table/plan/nodes/LogicalSink.scala       |  55 +++++
 .../org/apache/flink/table/plan/nodes/Sink.scala   |  57 +++++
 .../plan/nodes/datastream/DataStreamSink.scala     | 204 ++++++++++++++++
 .../plan/nodes/logical/FlinkLogicalSink.scala      |  73 ++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala     |   6 +-
 .../plan/rules/datastream/DataStreamSinkRule.scala |  53 +++++
 .../apache/flink/table/planner/StreamPlanner.scala | 263 ++++++---------------
 .../flink/table/sinks/DataStreamTableSink.scala    |  58 +++++
 .../flink/table/api/TableEnvironmentITCase.scala   |   4 +-
 .../flink/table/api/stream/ExplainTest.scala       |  75 ++++--
 .../apache/flink/table/utils/TableTestBase.scala   |  10 +
 .../resources/testFromToDataStreamAndSqlUpdate.out |  23 +-
 .../src/test/scala/resources/testInsert.out        |  29 +++
 .../test/scala/resources/testMultipleInserts.out   |  55 +++++
 .../resources/testSqlUpdateAndToDataStream.out     |  20 +-
 16 files changed, 785 insertions(+), 230 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 7f85245..82b3f2b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType,
 import org.apache.flink.table.catalog.BasicOperatorTable
 import org.apache.flink.table.functions.sql.ProctimeSqlFunction
 import org.apache.flink.table.plan.logical.rel._
+import org.apache.flink.table.plan.nodes.LogicalSink
 import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
 
 import java.util.{Collections => JCollections}
@@ -173,6 +174,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
     case temporalTableJoin: LogicalTemporalTableJoin =>
       visit(temporalTableJoin)
 
+    case sink: LogicalSink =>
+      var newInput = sink.getInput.accept(this)
+      var needsConversion = false
+
+      val projects = newInput.getRowType.getFieldList.map { field =>
+        if (isProctimeIndicatorType(field.getType)) {
+          needsConversion = true
+          rexBuilder.makeCall(ProctimeSqlFunction, new RexInputRef(field.getIndex, field.getType))
+        } else {
+          new RexInputRef(field.getIndex, field.getType)
+        }
+      }
+
+      // add final conversion if necessary
+      if (needsConversion) {
+        newInput = LogicalProject.create(newInput, projects, newInput.getRowType.getFieldNames)
+      }
+      new LogicalSink(
+        sink.getCluster,
+        sink.getTraitSet,
+        newInput,
+        sink.sink,
+        sink.sinkName)
+
     case _ =>
       throw new TableException(s"Unsupported logical operator: ${other.getClass.getSimpleName}")
   }
@@ -392,6 +417,11 @@ object RelTimeIndicatorConverter {
     val converter = new RelTimeIndicatorConverter(rexBuilder)
     val convertedRoot = rootRel.accept(converter)
 
+    // the LogicalSink is converted in RelTimeIndicatorConverter before
+    if (rootRel.isInstanceOf[LogicalSink]) {
+      return convertedRoot
+    }
+
     var needsConversion = false
 
     // materialize remaining proctime indicators
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/LogicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/LogicalSink.scala
new file mode 100644
index 0000000..478bb85
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/LogicalSink.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Sink]] that is a relational expression
+  * which writes out data of input node into a [[TableSink]].
+  * This class corresponds to Calcite logical rel.
+  */
+final class LogicalSink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    sink: TableSink[_],
+    sinkName: String)
+  extends Sink(cluster, traitSet, input, sink, sinkName) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+    new LogicalSink(cluster, traitSet, inputs.head, sink, sinkName)
+  }
+
+}
+
+object LogicalSink {
+
+  def create(input: RelNode, sink: TableSink[_], sinkName: String): LogicalSink = {
+    val traits = input.getCluster.traitSetOf(Convention.NONE)
+    new LogicalSink(input.getCluster, traits, input, sink, sinkName)
+  }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/Sink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/Sink.scala
new file mode 100644
index 0000000..cd873d1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/Sink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+
+/**
+  * Relational expression that writes out data of input node into a [[TableSink]].
+  *
+  * @param cluster  cluster that this relational expression belongs to
+  * @param traitSet the traits of this rel
+  * @param input    input relational expression
+  * @param sink     Table sink to write into
+  * @param sinkName Name of tableSink, which is not required property, that is, it could be null
+  */
+abstract class Sink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    val sink: TableSink[_],
+    val sinkName: String)
+  extends SingleRel(cluster, traitSet, input) {
+
+  override def deriveRowType(): RelDataType = {
+    val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    val tableSchema = sink.getTableSchema
+    typeFactory.buildLogicalRowType(tableSchema.getFieldNames, tableSchema.getFieldTypes)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .itemIf("name", sinkName, sinkName != null)
+      .item("fields", sink.getTableSchema.getFieldNames.mkString(", "))
+  }
+
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSink.scala
new file mode 100644
index 0000000..6f2df68
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSink.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink => StreamingDataStreamSink}
+import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.Sink
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
+import org.apache.flink.table.planner.{DataStreamConversions, StreamPlanner}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.sinks.{AppendStreamTableSink, DataStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
+import org.apache.flink.table.types.utils.TypeConversions
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+
+import _root_.java.lang.{Boolean => JBool}
+
+import _root_.scala.collection.JavaConverters._
+
+
+/**
+  * Stream physical RelNode to to write data into an external sink defined by a [[TableSink]].
+  */
+class DataStreamSink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    sink: TableSink[_],
+    sinkName: String)
+  extends Sink(cluster, traitSet, inputRel, sink, sinkName)
+  with DataStreamRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamSink(cluster, traitSet, inputs.get(0), sink, sinkName)
+  }
+
+  override def translateToPlan(planner: StreamPlanner): DataStream[CRow] = {
+    val inputTransform = writeToSink(planner).asInstanceOf[Transformation[CRow]]
+    new DataStream(planner.getExecutionEnvironment, inputTransform)
+  }
+
+  private def writeToSink[T](planner: StreamPlanner): Transformation[_] = {
+    sink match {
+      case retractSink: RetractStreamTableSink[T] =>
+        val resultSink = writeToRetractSink(retractSink, planner)
+        resultSink.getTransformation
+
+      case upsertSink: UpsertStreamTableSink[T] =>
+        val resultSink = writeToUpsertSink(upsertSink, planner)
+        resultSink.getTransformation
+
+      case appendSink: AppendStreamTableSink[T] =>
+        val resultSink = writeToAppendSink(appendSink, planner)
+        resultSink.getTransformation
+
+      case dataStreamTableSink: DataStreamTableSink[_] =>
+        // if no change flags are requested, verify table is an insert-only (append-only) table.
+        if (!dataStreamTableSink.withChangeFlag && !UpdatingPlanChecker.isAppendOnly(getInput)) {
+          throw new ValidationException(
+            "Table is not an append-only table. " +
+              "Use the toRetractStream() in order to handle add and retract messages.")
+        }
+        val dataStream = translateInput(
+          planner,
+          getTableSchema,
+          dataStreamTableSink.getOutputType,
+          dataStreamTableSink.withChangeFlag)
+        dataStream.getTransformation
+
+      case _ =>
+        throw new ValidationException("Stream Tables can only be emitted by AppendStreamTableSink, "
+          + "RetractStreamTableSink, or UpsertStreamTableSink.")
+    }
+  }
+
+  private def writeToRetractSink[T](
+      sink: RetractStreamTableSink[T],
+      planner: StreamPlanner): StreamingDataStreamSink[_]= {
+    // retraction sink can always be used
+    val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
+      .asInstanceOf[TypeInformation[JTuple2[JBool, T]]]
+    // translate the Table into a DataStream and provide the type that the TableSink expects.
+    val result: DataStream[JTuple2[JBool, T]] =
+      translateToType(
+        planner,
+        withChangeFlag = true,
+        outputType)
+    // Give the DataStream to the TableSink to emit it.
+    sink.consumeDataStream(result)
+  }
+
+  private def writeToAppendSink[T](
+      sink: AppendStreamTableSink[T],
+      planner: StreamPlanner): StreamingDataStreamSink[_]= {
+    // verify table is an insert-only (append-only) table
+    if (!UpdatingPlanChecker.isAppendOnly(getInput)) {
+      throw new TableException(
+        "AppendStreamTableSink requires that Table has only insert changes.")
+    }
+    val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
+      .asInstanceOf[TypeInformation[T]]
+    val resultType = getTableSchema
+    // translate the Table into a DataStream and provide the type that the TableSink expects.
+    val result: DataStream[T] =
+      translateInput(
+        planner,
+        resultType,
+        outputType,
+        withChangeFlag = false)
+    // Give the DataStream to the TableSink to emit it.
+    sink.consumeDataStream(result)
+  }
+
+  private def writeToUpsertSink[T](
+      sink: UpsertStreamTableSink[T],
+      planner: StreamPlanner): StreamingDataStreamSink[_] = {
+    // optimize plan
+    // check for append only table
+    val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(getInput)
+    sink.setIsAppendOnly(isAppendOnlyTable)
+    // extract unique key fields
+    val sinkFieldNames = sink.getTableSchema.getFieldNames
+    val tableKeys: Option[Array[String]] = UpdatingPlanChecker
+      .getUniqueKeyFields(getInput, sinkFieldNames)
+    // check that we have keys if the table has changes (is not append-only)
+    tableKeys match {
+      case Some(keys) => sink.setKeyFields(keys)
+      case None if isAppendOnlyTable => sink.setKeyFields(null)
+      case None if !isAppendOnlyTable => throw new TableException(
+        "UpsertStreamTableSink requires that Table has full primary keys if it is updated.")
+    }
+    val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
+      .asInstanceOf[TypeInformation[JTuple2[JBool, T]]]
+    val resultType = getTableSchema
+    // translate the Table into a DataStream and provide the type that the TableSink expects.
+    val result: DataStream[JTuple2[JBool, T]] =
+      translateInput(
+        planner,
+        resultType,
+        outputType,
+        withChangeFlag = true)
+    // Give the DataStream to the TableSink to emit it.
+    sink.consumeDataStream(result)
+  }
+
+  private def translateToType[A](
+      planner: StreamPlanner,
+      withChangeFlag: Boolean,
+      tpe: TypeInformation[A]): DataStream[A] = {
+    val rowType = getTableSchema
+
+    // if no change flags are requested, verify table is an insert-only (append-only) table.
+    if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(input)) {
+      throw new ValidationException(
+        "Table is not an append-only table. " +
+          "Use the toRetractStream() in order to handle add and retract messages.")
+    }
+
+    // get CRow plan
+    translateInput(planner, rowType, tpe, withChangeFlag)
+  }
+
+  private def translateInput[A](
+      planner: StreamPlanner,
+      logicalSchema: TableSchema,
+      tpe: TypeInformation[A],
+      withChangeFlag: Boolean): DataStream[A] = {
+    val dataStream = getInput().asInstanceOf[DataStreamRel].translateToPlan(planner)
+    DataStreamConversions.convert(dataStream, logicalSchema, withChangeFlag, tpe, planner.getConfig)
+  }
+
+  /**
+    * Returns the record type of the optimized plan with field names of the logical plan.
+    */
+  private def getTableSchema: TableSchema = {
+    val fieldTypes = getInput.getRowType.getFieldList.asScala.map(_.getType)
+      .map(FlinkTypeFactory.toTypeInfo)
+      .map(TypeConversions.fromLegacyInfoToDataType)
+      .toArray
+    TableSchema.builder().fields(sink.getTableSchema.getFieldNames, fieldTypes).build()
+  }
+
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
new file mode 100644
index 0000000..2e671ac
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.{FlinkConventions, LogicalSink, Sink}
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Sub-class of [[Sink]] that is a relational expression
+  * which writes out data of input node into a [[TableSink]].
+  */
+class FlinkLogicalSink(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    sink: TableSink[_],
+    sinkName: String)
+  extends Sink(cluster, traitSet, input, sink, sinkName)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+    new FlinkLogicalSink(cluster, traitSet, inputs.head, sink, sinkName)
+  }
+
+}
+
+private class FlinkLogicalSinkConverter
+  extends ConverterRule(
+    classOf[LogicalSink],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalSinkConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val sink = rel.asInstanceOf[LogicalSink]
+    val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
+    FlinkLogicalSink.create(newInput, sink.sink, sink.sinkName)
+  }
+}
+
+object FlinkLogicalSink {
+  val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter()
+
+  def create(input: RelNode, sink: TableSink[_], sinkName: String): FlinkLogicalSink = {
+    val cluster = input.getCluster
+    val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
+    new FlinkLogicalSink(cluster, traitSet, input, sink, sinkName)
+  }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 964dd04..a872fb4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -146,7 +146,8 @@ object FlinkRuleSets {
     FlinkLogicalTableFunctionScan.CONVERTER,
     FlinkLogicalMatch.CONVERTER,
     FlinkLogicalTableAggregate.CONVERTER,
-    FlinkLogicalWindowTableAggregate.CONVERTER
+    FlinkLogicalWindowTableAggregate.CONVERTER,
+    FlinkLogicalSink.CONVERTER
   )
 
   /**
@@ -265,7 +266,8 @@ object FlinkRuleSets {
     DataStreamTableAggregateRule.INSTANCE,
     DataStreamGroupWindowTableAggregateRule.INSTANCE,
     DataStreamPythonCalcRule.INSTANCE,
-    DataStreamPythonCorrelateRule.INSTANCE
+    DataStreamPythonCorrelateRule.INSTANCE,
+    DataStreamSinkRule.INSTANCE
   )
 
   /**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSinkRule.scala
new file mode 100644
index 0000000..6d02308
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSinkRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSink
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+class DataStreamSinkRule
+  extends ConverterRule(
+    classOf[FlinkLogicalSink],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
+    "DataStreamSinkRule") {
+
+  def convert(rel: RelNode): RelNode = {
+    val sink: FlinkLogicalSink = rel.asInstanceOf[FlinkLogicalSink]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convInput: RelNode = RelOptRule.convert(sink.getInput(0), FlinkConventions.DATASTREAM)
+
+    new DataStreamSink(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      sink.sink,
+      sink.sinkName
+    )
+  }
+}
+
+object DataStreamSinkRule {
+  val INSTANCE = new DataStreamSinkRule
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 659bcfe..f549168 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -18,10 +18,8 @@
 
 package org.apache.flink.table.planner
 import org.apache.flink.annotation.VisibleForTesting
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite._
@@ -34,8 +32,8 @@ import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryConte
 import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
 import org.apache.flink.table.plan.StreamOptimizer
+import org.apache.flink.table.plan.nodes.LogicalSink
 import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
-import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.types.utils.TypeConversions
@@ -46,7 +44,6 @@ import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 
-import _root_.java.lang.{Boolean => JBool}
 import _root_.java.util
 import _root_.java.util.Objects
 import _root_.java.util.function.{Supplier => JSupplier}
@@ -111,20 +108,55 @@ class StreamPlanner(
 
   override def getParser: Parser = parser
 
-  override def translate(tableOperations: util.List[ModifyOperation])
-    : util.List[Transformation[_]] = {
-    tableOperations.asScala.map(translate).filter(Objects.nonNull).asJava
+  override def translate(
+      tableOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
+    val planner = createDummyPlanner()
+    tableOperations.asScala.map { operation =>
+      val (ast, updatesAsRetraction) = translateToRel(operation)
+      val optimizedPlan = optimizer.optimize(ast, updatesAsRetraction, getRelBuilder)
+      val dataStream = translateToCRow(planner, optimizedPlan)
+      dataStream.getTransformation.asInstanceOf[Transformation[_]]
+    }.filter(Objects.nonNull).asJava
   }
 
   override def explain(operations: util.List[Operation], extended: Boolean): String = {
-    operations.asScala.map {
+    require(operations.asScala.nonEmpty, "operations should not be empty")
+    val astWithUpdatesAsRetractionTuples = operations.asScala.map {
       case queryOperation: QueryOperation =>
-        explain(queryOperation)
+        (getRelBuilder.tableOperation(queryOperation).build(), false)
       case modifyOperation: ModifyOperation =>
-        explain(modifyOperation.getChild)
-      case operation =>
-        throw new TableException(s"${operation.getClass.getCanonicalName} is not supported")
-    }.mkString(s"${System.lineSeparator}${System.lineSeparator}")
+        translateToRel(modifyOperation)
+      case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
+    }
+
+    val optimizedNodes = astWithUpdatesAsRetractionTuples.map {
+      case (ast, updatesAsRetraction) =>
+        optimizer.optimize(ast, updatesAsRetraction, getRelBuilder)
+    }
+
+    val planner = createDummyPlanner()
+    val dataStreams = optimizedNodes.map(p => translateToCRow(planner, p))
+
+    val astPlan = astWithUpdatesAsRetractionTuples.map {
+      p => RelOptUtil.toString(p._1)
+    }.mkString(System.lineSeparator)
+    val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
+
+    val env = dataStreams.head.getExecutionEnvironment
+    val jsonSqlPlan = env.getExecutionPlan
+    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
+
+    s"== Abstract Syntax Tree ==" +
+      System.lineSeparator +
+      s"$astPlan" +
+      System.lineSeparator +
+      s"== Optimized Logical Plan ==" +
+      System.lineSeparator +
+      s"$optimizedPlan" +
+      System.lineSeparator +
+      s"== Physical Execution Plan ==" +
+      System.lineSeparator +
+      s"$sqlPlan"
   }
 
   override def getCompletionHints(
@@ -135,11 +167,10 @@ class StreamPlanner(
     planner.getCompletionHints(statement, position)
   }
 
-  private def translate(tableOperation: ModifyOperation)
-    : Transformation[_] = {
-    tableOperation match {
-      case s : UnregisteredSinkModifyOperation[_] =>
-        writeToSink(s.getChild, s.getSink)
+  private def translateToRel(modifyOperation: ModifyOperation): (RelNode, Boolean) = {
+    modifyOperation match {
+      case s: UnregisteredSinkModifyOperation[_] =>
+        writeToSink(s.getChild, s.getSink, "UnregisteredSink")
 
       case catalogSink: CatalogSinkModifyOperation =>
         getTableSink(catalogSink.getTableIdentifier)
@@ -164,7 +195,10 @@ class StreamPlanner(
                   s"${classOf[OverwritableTableSink].getSimpleName} but actually got " +
                   sink.getClass.getName)
             }
-            writeToSink(catalogSink.getChild, sink)
+            writeToSink(
+              catalogSink.getChild,
+              sink,
+              catalogSink.getTableIdentifier.asSummaryString())
           }) match {
           case Some(t) => t
           case None =>
@@ -178,41 +212,19 @@ class StreamPlanner(
           case UpdateMode.UPSERT => (false, true)
         }
 
-        translateToType(
-          tableOperation.getChild,
-          isRetract,
-          withChangeFlag,
-          TypeConversions.fromDataTypeToLegacyInfo(outputConversion.getType)).getTransformation
+        val tableSink = new DataStreamTableSink(
+          outputConversion.getChild.getTableSchema,
+          TypeConversions.fromDataTypeToLegacyInfo(outputConversion.getType),
+          withChangeFlag)
+        val input = getRelBuilder.tableOperation(modifyOperation.getChild).build()
+        val sink = LogicalSink.create(input, tableSink, "DataStreamTableSink")
+        (sink, isRetract)
 
       case _ =>
-        throw new TableException(s"Unsupported ModifyOperation: $tableOperation")
+        throw new TableException(s"Unsupported ModifyOperation: $modifyOperation")
     }
   }
 
-  private def explain(tableOperation: QueryOperation) = {
-    val ast = getRelBuilder.tableOperation(tableOperation).build()
-    val optimizedPlan = optimizer
-      .optimize(ast, updatesAsRetraction = false, getRelBuilder)
-    val dataStream = translateToCRow(optimizedPlan)
-
-    val env = dataStream.getExecutionEnvironment
-    val jsonSqlPlan = env.getExecutionPlan
-
-    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
-
-    s"== Abstract Syntax Tree ==" +
-      System.lineSeparator +
-      s"${RelOptUtil.toString(ast)}" +
-      System.lineSeparator +
-      s"== Optimized Logical Plan ==" +
-      System.lineSeparator +
-      s"${RelOptUtil.toString(optimizedPlan)}" +
-      System.lineSeparator +
-      s"== Physical Execution Plan ==" +
-      System.lineSeparator +
-      s"$sqlPlan"
-  }
-
   private def getFlinkPlanner: FlinkPlannerImpl = {
     val currentCatalogName = catalogManager.getCurrentCatalog
     val currentDatabase = catalogManager.getCurrentDatabase
@@ -232,9 +244,7 @@ class StreamPlanner(
   private[flink] def getExecutionEnvironment: StreamExecutionEnvironment =
     executor.asInstanceOf[StreamExecutor].getExecutionEnvironment
 
-  private def translateToCRow(logicalPlan: RelNode): DataStream[CRow] = {
-    val planner = createDummyPlanner()
-
+  private def translateToCRow(planner: StreamPlanner, logicalPlan: RelNode): DataStream[CRow] = {
     logicalPlan match {
       case node: DataStreamRel =>
         getExecutionEnvironment.configure(
@@ -249,163 +259,38 @@ class StreamPlanner(
 
   private def writeToSink[T](
       tableOperation: QueryOperation,
-      sink: TableSink[T])
-    : Transformation[_] = {
+      sink: TableSink[T],
+      sinkName: String): (RelNode, Boolean) = {
 
-    val resultSink = sink match {
+    val updatesAsRetraction = sink match {
       case retractSink: RetractStreamTableSink[T] =>
         retractSink match {
           case _: PartitionableTableSink =>
             throw new TableException("Partitionable sink in retract stream mode " +
               "is not supported yet!")
-          case _ =>
+          case _ => // do nothing
         }
-        writeToRetractSink(retractSink, tableOperation)
+        true
 
       case upsertSink: UpsertStreamTableSink[T] =>
         upsertSink match {
           case _: PartitionableTableSink =>
             throw new TableException("Partitionable sink in upsert stream mode " +
               "is not supported yet!")
-          case _ =>
+          case _ => // do nothing
         }
-        writeToUpsertSink(upsertSink, tableOperation)
+        false
 
-      case appendSink: AppendStreamTableSink[T] =>
-        writeToAppendSink(appendSink, tableOperation)
+      case _: AppendStreamTableSink[T] =>
+        false
 
       case _ =>
         throw new ValidationException("Stream Tables can only be emitted by AppendStreamTableSink, "
           + "RetractStreamTableSink, or UpsertStreamTableSink.")
     }
 
-    if (resultSink != null) {
-      resultSink.getTransformation
-    } else {
-      null
-    }
-  }
-
-  private def writeToRetractSink[T](
-      sink: RetractStreamTableSink[T],
-      tableOperation: QueryOperation)
-    : DataStreamSink[_]= {
-    // retraction sink can always be used
-    val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
-      .asInstanceOf[TypeInformation[JTuple2[JBool, T]]]
-    // translate the Table into a DataStream and provide the type that the TableSink expects.
-    val result: DataStream[JTuple2[JBool, T]] =
-      translateToType(
-        tableOperation,
-        updatesAsRetraction = true,
-        withChangeFlag = true,
-        outputType)
-    // Give the DataStream to the TableSink to emit it.
-    sink.consumeDataStream(result)
-  }
-
-  private def writeToAppendSink[T](
-      sink: AppendStreamTableSink[T],
-      tableOperation: QueryOperation)
-    : DataStreamSink[_]= {
-    // optimize plan
-    val relNode = getRelBuilder.tableOperation(tableOperation).build()
-    val optimizedPlan = optimizer.optimize(relNode, updatesAsRetraction = false, getRelBuilder)
-    // verify table is an insert-only (append-only) table
-    if (!UpdatingPlanChecker.isAppendOnly(optimizedPlan)) {
-      throw new TableException(
-        "AppendStreamTableSink requires that Table has only insert changes.")
-    }
-    val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
-      .asInstanceOf[TypeInformation[T]]
-    val resultType = getTableSchema(tableOperation.getTableSchema.getFieldNames, optimizedPlan)
-    // translate the Table into a DataStream and provide the type that the TableSink expects.
-    val result: DataStream[T] =
-      translateOptimized(
-        optimizedPlan,
-        resultType,
-        outputType,
-        withChangeFlag = false)
-    // Give the DataStream to the TableSink to emit it.
-    sink.consumeDataStream(result)
-  }
-
-  private def writeToUpsertSink[T](
-      sink: UpsertStreamTableSink[T],
-      tableOperation: QueryOperation)
-    : DataStreamSink[_] = {
-    // optimize plan
-    val relNode = getRelBuilder.tableOperation(tableOperation).build()
-    val optimizedPlan = optimizer.optimize(relNode, updatesAsRetraction = false, getRelBuilder)
-    // check for append only table
-    val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(optimizedPlan)
-    sink.setIsAppendOnly(isAppendOnlyTable)
-    // extract unique key fields
-    val sinkFieldNames = sink.getTableSchema.getFieldNames
-    val tableKeys: Option[Array[String]] = UpdatingPlanChecker
-      .getUniqueKeyFields(optimizedPlan, sinkFieldNames)
-    // check that we have keys if the table has changes (is not append-only)
-    tableKeys match {
-      case Some(keys) => sink.setKeyFields(keys)
-      case None if isAppendOnlyTable => sink.setKeyFields(null)
-      case None if !isAppendOnlyTable => throw new TableException(
-        "UpsertStreamTableSink requires that Table has full primary keys if it is updated.")
-    }
-    val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
-      .asInstanceOf[TypeInformation[JTuple2[JBool, T]]]
-    val resultType = getTableSchema(tableOperation.getTableSchema.getFieldNames, optimizedPlan)
-    // translate the Table into a DataStream and provide the type that the TableSink expects.
-    val result: DataStream[JTuple2[JBool, T]] =
-      translateOptimized(
-        optimizedPlan,
-        resultType,
-        outputType,
-        withChangeFlag = true)
-    // Give the DataStream to the TableSink to emit it.
-    sink.consumeDataStream(result)
-  }
-
-  private def translateToType[A](
-      table: QueryOperation,
-      updatesAsRetraction: Boolean,
-      withChangeFlag: Boolean,
-      tpe: TypeInformation[A])
-    : DataStream[A] = {
-    val relNode = getRelBuilder.tableOperation(table).build()
-    val dataStreamPlan = optimizer.optimize(relNode, updatesAsRetraction, getRelBuilder)
-    val rowType = getTableSchema(table.getTableSchema.getFieldNames, dataStreamPlan)
-
-    // if no change flags are requested, verify table is an insert-only (append-only) table.
-    if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(dataStreamPlan)) {
-      throw new ValidationException(
-        "Table is not an append-only table. " +
-          "Use the toRetractStream() in order to handle add and retract messages.")
-    }
-
-    // get CRow plan
-    translateOptimized(dataStreamPlan, rowType, tpe, withChangeFlag)
-  }
-
-  private def translateOptimized[A](
-      optimizedPlan: RelNode,
-      logicalSchema: TableSchema,
-      tpe: TypeInformation[A],
-      withChangeFlag: Boolean)
-    : DataStream[A] = {
-    val dataStream = translateToCRow(optimizedPlan)
-    DataStreamConversions.convert(dataStream, logicalSchema, withChangeFlag, tpe, config)
-  }
-
-  /**
-    * Returns the record type of the optimized plan with field names of the logical plan.
-    */
-  private def getTableSchema(originalNames: Array[String], optimizedPlan: RelNode): TableSchema = {
-    val fieldTypes = optimizedPlan.getRowType.getFieldList.asScala.map(_.getType)
-      .map(FlinkTypeFactory.toTypeInfo)
-      .map(TypeConversions.fromLegacyInfoToDataType)
-      .toArray
-
-    TableSchema.builder().fields(originalNames, fieldTypes).build()
+    val input = getRelBuilder.tableOperation(tableOperation).build()
+    (LogicalSink.create(input, sink, sinkName), updatesAsRetraction)
   }
 
   private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
new file mode 100644
index 0000000..ba2b132
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{TableException, TableSchema}
+import org.apache.flink.table.operations.OutputConversionModifyOperation
+
+/**
+  * A special [[TableSink]] that represents information of [[OutputConversionModifyOperation]].
+  *
+  * @param outputType The [[TypeInformation]] that specifies the type of the [[DataStream]].
+  * @param withChangeFlag Set to true to emit records with change flags.
+  * @tparam T The type of the resulting [[DataStream]].
+  */
+@Internal
+class DataStreamTableSink[T](
+    tableSchema: TableSchema,
+    outputType: TypeInformation[T],
+    val withChangeFlag: Boolean)
+  extends TableSink[T] {
+
+  /**
+    * Return the type expected by this [[TableSink]].
+    *
+    * This type should depend on the types returned by [[getTableSchema]].
+    *
+    * @return The type expected by this [[TableSink]].
+    */
+  override def getOutputType: TypeInformation[T] = outputType
+
+  override def getTableSchema: TableSchema = tableSchema
+
+  override def configure(
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+    throw new TableException(s"configure is not supported.")
+  }
+
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 6a43957..c5e90bf 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -221,9 +221,7 @@ class TableEnvironmentITCase(tableEnvName: String) {
     }
     val streamEnv = ScalaStreamExecutionEnvironment.getExecutionEnvironment
     val streamTableEnv = ScalaStreamTableEnvironment.create(streamEnv, settings)
-    val t = streamEnv.fromCollection(getPersonData)
-      .toTable(streamTableEnv, 'first, 'id, 'score, 'last)
-    streamTableEnv.registerTable("MyTable", t)
+    streamTableEnv.registerTableSource("MyTable", getPersonCsvTableSource)
     val sink1Path = registerCsvTableSink(streamTableEnv, Array("first"), Array(STRING), "MySink1")
     checkEmptyFile(sink1Path)
     StreamITCase.clear
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 62f6da3..14ba09b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -18,11 +18,14 @@
 
 package org.apache.flink.table.api.stream
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{EnvironmentSettings, Table}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil.streamTableNode
+import org.apache.flink.table.api.{EnvironmentSettings, Table, Types}
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId, streamTableNode}
 import org.apache.flink.test.util.AbstractTestBase
 
 import org.junit.Assert.assertEquals
@@ -30,8 +33,6 @@ import org.junit._
 
 class ExplainTest extends AbstractTestBase {
 
-  private val testFilePath = this.getClass.getResource("/").getFile
-
   @Test
   def testFilter(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -41,10 +42,9 @@ class ExplainTest extends AbstractTestBase {
     val scan = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
     val table = scan.filter($"a" % 2 === 0)
 
-    val result = replaceString(tEnv.explain(table))
+    val result = replaceStageId(tEnv.explain(table))
 
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilterStream0.out").mkString
+    val source = readFromResource("testFilterStream0.out")
     val expect = replaceString(source, scan)
     assertEquals(expect, result)
   }
@@ -59,34 +59,69 @@ class ExplainTest extends AbstractTestBase {
     val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
     val table = table1.unionAll(table2)
 
-    val result = replaceString(tEnv.explain(table))
+    val result = replaceStageId(tEnv.explain(table))
 
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnionStream0.out").mkString
+    val source = readFromResource("testUnionStream0.out")
     val expect = replaceString(source, table1, table2)
     assertEquals(expect, result)
   }
 
+  @Test
+  def testInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
+    val tEnv = StreamTableEnvironment.create(env, settings)
+
+    tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
+
+    tEnv.sqlUpdate("INSERT INTO targetTable SELECT first, id FROM sourceTable")
+
+    val result = tEnv.explain(false)
+    val source = readFromResource("testInsert.out")
+    assertEquals(replaceStageId(source), replaceStageId(result))
+  }
+
+  @Test
+  def testMultipleInserts(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
+    val tEnv = StreamTableEnvironment.create(env, settings)
+
+    tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable1", sink.configure(fieldNames, fieldTypes))
+    tEnv.registerTableSink("targetTable2", sink.configure(fieldNames, fieldTypes))
+
+    tEnv.sqlUpdate("INSERT INTO targetTable1 SELECT first, id FROM sourceTable")
+    tEnv.sqlUpdate("INSERT INTO targetTable2 SELECT last, id FROM sourceTable")
+
+    val result = tEnv.explain(false)
+    val source = readFromResource("testMultipleInserts.out")
+    assertEquals(replaceStageId(source), replaceStageId(result))
+  }
+
   def replaceString(s: String, t1: Table, t2: Table): String = {
-    replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1)
+    replaceSourceNode(replaceSourceNode(replaceStageId(s), t1, 0), t2, 1)
   }
 
   def replaceString(s: String, t: Table): String = {
-    replaceSourceNode(replaceString(s), t, 0)
+    replaceSourceNode(replaceStageId(s), t, 0)
   }
 
-  private def replaceSourceNode(s: String, t: Table, idx: Int) = {
-    replaceString(s)
+  private def replaceSourceNode(s: String, t: Table, idx: Int): String = {
+    replaceStageId(s)
       .replace(
         s"%logicalSourceNode$idx%", streamTableNode(t)
           .replace("DataStreamScan", "FlinkLogicalDataStreamScan"))
       .replace(s"%sourceNode$idx%", streamTableNode(t))
   }
 
-  def replaceString(s: String) = {
-    /* Stage {id} is ignored, because id keeps incrementing in test class
-     * while StreamExecutionEnvironment is up
-     */
-    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
-  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index e2b916a..66b9c7f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -46,6 +46,7 @@ import org.junit.{ComparisonFailure, Rule}
 import org.mockito.Mockito.{mock, when}
 
 import _root_.scala.util.control.Breaks._
+import scala.io.Source
 
 /**
   * Test base for testing Table API / SQL plans.
@@ -214,6 +215,15 @@ object TableTestUtil {
 
     s"DataStreamScan(id=[$id], fields=[${fieldNames.mkString(", ")}])"
   }
+
+  def readFromResource(file: String): String = {
+    val source = s"${getClass.getResource("/").getFile}../../src/test/scala/resources/$file"
+    Source.fromFile(source).mkString
+  }
+
+  def replaceStageId(s: String): String = {
+    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+  }
 }
 
 case class BatchTableTestUtil(
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testFromToDataStreamAndSqlUpdate.out b/flink-table/flink-table-planner/src/test/scala/resources/testFromToDataStreamAndSqlUpdate.out
index 9cc6251..76c0638 100644
--- a/flink-table/flink-table-planner/src/test/scala/resources/testFromToDataStreamAndSqlUpdate.out
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testFromToDataStreamAndSqlUpdate.out
@@ -1,21 +1,22 @@
 == Abstract Syntax Tree ==
-LogicalProject(first=[$0])
-  FlinkLogicalDataStreamScan(fields=[first, id, score, last])
+LogicalSink(name=[default_catalog.default_database.MySink1], fields=[first])
+  LogicalProject(first=[$0])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 
 == Optimized Logical Plan ==
-DataStreamCalc(select=[first])
-  DataStreamScan(fields=[first, id, score, last])
+DataStreamSink(name=[default_catalog.default_database.MySink1], fields=[first])
+  StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[first], source=[CsvTableSource(read fields: first)])
 
 == Physical Execution Plan ==
  : Data Source
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : from: (first, id, score, last)
+		content : CsvTableSource(read fields: first)
 		ship_strategy : REBALANCE
 
 		 : Operator
-			content : where: (>(id, 0)), select: (last)
+			content : Map
 			ship_strategy : FORWARD
 
 			 : Operator
@@ -23,14 +24,10 @@ DataStreamCalc(select=[first])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : from: (first, id, score, last)
+					content : Map
 					ship_strategy : REBALANCE
 
-					 : Operator
-						content : select: (first)
+					 : Data Sink
+						content : Sink: CsvTableSink(first)
 						ship_strategy : FORWARD
 
-						 : Data Sink
-							content : Sink: Unnamed
-							ship_strategy : FORWARD
-
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testInsert.out b/flink-table/flink-table-planner/src/test/scala/resources/testInsert.out
new file mode 100644
index 0000000..bddbee2
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testInsert.out
@@ -0,0 +1,29 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.targetTable], fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.targetTable], fields=[d, e])
+  StreamTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : CsvTableSource(read fields: first, id)
+		ship_strategy : REBALANCE
+
+		 : Operator
+			content : Map
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : to: Row
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: UnsafeMemoryAppendTableSink(d, e)
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts.out b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts.out
new file mode 100644
index 0000000..0bb4d0f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts.out
@@ -0,0 +1,55 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.targetTable1], fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+LogicalSink(name=[default_catalog.default_database.targetTable2], fields=[d, e])
+  LogicalProject(last=[$3], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.targetTable1], fields=[d, e])
+  StreamTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+DataStreamSink(name=[default_catalog.default_database.targetTable2], fields=[d, e])
+  StreamTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[last, id], source=[CsvTableSource(read fields: last, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : CsvTableSource(read fields: first, id)
+		ship_strategy : REBALANCE
+
+		 : Operator
+			content : Map
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : to: Row
+				ship_strategy : FORWARD
+
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : CsvTableSource(read fields: last, id)
+		ship_strategy : REBALANCE
+
+		 : Operator
+			content : Map
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : to: Row
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: UnsafeMemoryAppendTableSink(d, e)
+					ship_strategy : FORWARD
+
+					 : Data Sink
+						content : Sink: UnsafeMemoryAppendTableSink(d, e)
+						ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out b/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
index 40fce87..76c0638 100644
--- a/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
@@ -1,9 +1,11 @@
 == Abstract Syntax Tree ==
-LogicalProject(first=[$0])
-  LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+LogicalSink(name=[default_catalog.default_database.MySink1], fields=[first])
+  LogicalProject(first=[$0])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 
 == Optimized Logical Plan ==
-StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[first], source=[CsvTableSource(read fields: first)])
+DataStreamSink(name=[default_catalog.default_database.MySink1], fields=[first])
+  StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[first], source=[CsvTableSource(read fields: first)])
 
 == Physical Execution Plan ==
  : Data Source
@@ -17,3 +19,15 @@ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fiel
 			content : Map
 			ship_strategy : FORWARD
 
+			 : Operator
+				content : to: Row
+				ship_strategy : FORWARD
+
+				 : Operator
+					content : Map
+					ship_strategy : REBALANCE
+
+					 : Data Sink
+						content : Sink: CsvTableSink(first)
+						ship_strategy : FORWARD
+


[flink] 04/05: [FLINK-17267] [table] Introduce TableEnvironment#explainSql api

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a5fc5f28a1c3569da8de953f6cd1fad5371f6a4
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Apr 29 10:57:32 2020 +0800

    [FLINK-17267] [table] Introduce TableEnvironment#explainSql api
---
 flink-python/pyflink/table/__init__.py             |  4 +-
 flink-python/pyflink/table/explain_detail.py       | 34 ++++++++++++
 flink-python/pyflink/table/table_environment.py    | 20 ++++++-
 .../table/tests/test_table_environment_api.py      | 29 +++++++++-
 flink-python/pyflink/util/utils.py                 | 20 +++++++
 .../org/apache/flink/table/api/ExplainDetail.java} | 45 +++++-----------
 .../apache/flink/table/api/TableEnvironment.java   | 15 +++++-
 .../table/api/internal/TableEnvironmentImpl.java   | 35 +++++++++++--
 .../org/apache/flink/table/delegation/Planner.java |  7 +--
 .../org/apache/flink/table/utils/PlannerMock.java  |  3 +-
 .../table/planner/delegation/BatchPlanner.scala    |  8 +--
 .../table/planner/delegation/StreamPlanner.scala   | 11 ++--
 .../resources/explain/testExplainSqlWithInsert.out | 31 +++++++++++
 .../resources/explain/testExplainSqlWithSelect.out | 21 ++++++++
 .../flink/table/api/TableEnvironmentTest.scala     | 57 ++++++++++++++++++++
 .../flink/table/planner/utils/TableTestBase.scala  |  2 +-
 .../table/api/internal/BatchTableEnvImpl.scala     | 20 +++++--
 .../flink/table/api/internal/TableEnvImpl.scala    | 18 +++++--
 .../apache/flink/table/planner/StreamPlanner.scala |  2 +-
 .../api/batch/BatchTableEnvironmentTest.scala      | 61 +++++++++++++++++++++-
 .../api/stream/StreamTableEnvironmentTest.scala    | 58 ++++++++++++++++++++
 .../flink/table/utils/MockTableEnvironment.scala   |  4 +-
 .../scala/resources/testExplainSqlWithInsert0.out  | 31 +++++++++++
 .../scala/resources/testExplainSqlWithInsert1.out  | 43 +++++++++++++++
 .../scala/resources/testExplainSqlWithSelect0.out  | 21 ++++++++
 .../scala/resources/testExplainSqlWithSelect1.out  | 27 ++++++++++
 26 files changed, 562 insertions(+), 65 deletions(-)

diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 140c6b3..1e367f3 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -70,6 +70,7 @@ from pyflink.table.sources import TableSource, CsvTableSource
 from pyflink.table.types import DataTypes, UserDefinedType, Row
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.udf import FunctionContext, ScalarFunction
+from pyflink.table.explain_detail import ExplainDetail
 
 __all__ = [
     'TableEnvironment',
@@ -93,5 +94,6 @@ __all__ = [
     'TableSchema',
     'FunctionContext',
     'ScalarFunction',
-    'SqlDialect'
+    'SqlDialect',
+    'ExplainDetail'
 ]
diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py
new file mode 100644
index 0000000..48e7ce9
--- /dev/null
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -0,0 +1,34 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+__all__ = ['ExplainDetail']
+
+
+class ExplainDetail(object):
+    """
+    ExplainDetail defines the types of details for explain result.
+    """
+
+    # The cost information on physical rel node estimated by optimizer.
+    # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network,
+    # 0.0 memory}
+    ESTIMATED_COST = 0
+
+    # The changelog traits produced by a physical rel node.
+    # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+    CHANGELOG_TRAITS = 1
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index d8e8c51..94ff785 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -36,7 +36,8 @@ from pyflink.table import Table
 from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
     _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema
 from pyflink.util import utils
-from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class
+from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class, \
+    to_j_explain_detail_arr
 
 __all__ = [
     'BatchTableEnvironment',
@@ -468,6 +469,23 @@ class TableEnvironment(object):
         else:
             return self._j_tenv.explain(table._j_table, extended)
 
+    def explain_sql(self, stmt, *extra_details):
+        """
+        Returns the AST of the specified statement and the execution plan to compute
+        the result of the given statement.
+
+        :param stmt: The statement for which the AST and execution plan will be returned.
+        :type stmt: str
+        :param extra_details: The extra explain details which the explain result should include,
+                              e.g. estimated cost, change log trait for streaming
+        :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
+        :return: The statement for which the AST and execution plan will be returned.
+        :rtype: str
+        """
+
+        j_extra_details = to_j_explain_detail_arr(extra_details)
+        return self._j_tenv.explainSql(stmt, j_extra_details)
+
     def sql_query(self, query):
         """
         Evaluates a SQL query on registered tables and retrieves the result as a
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 87c8023..bd279af 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -35,8 +35,8 @@ from pyflink.table.types import RowType
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase, \
     PyFlinkBlinkBatchTableTestCase
-from pyflink.util.exceptions import TableException
 from pyflink.util.utils import get_j_env_configuration
+from pyflink.table.explain_detail import ExplainDetail
 
 
 class TableEnvironmentTest(object):
@@ -242,6 +242,33 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
         actual = t_env.explain(extended=True)
         assert isinstance(actual, str)
 
+    def test_explain_sql_without_explain_detail(self):
+        t_env = self.t_env
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+
+        result = t_env.explain_sql("select a + 1, b, c from %s" % source)
+
+        assert isinstance(result, str)
+
+    def test_explain_sql_with_explain_detail(self):
+        t_env = self.t_env
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+
+        result = t_env.explain_sql(
+            "select a + 1, b, c from %s" % source, ExplainDetail.ESTIMATED_COST)
+
+        assert isinstance(result, str)
+
     def test_create_table_environment(self):
         table_config = TableConfig()
         table_config.set_max_generated_code_length(32000)
diff --git a/flink-python/pyflink/util/utils.py b/flink-python/pyflink/util/utils.py
index 89db742..29a20da 100644
--- a/flink-python/pyflink/util/utils.py
+++ b/flink-python/pyflink/util/utils.py
@@ -125,3 +125,23 @@ def add_jars_to_context_class_loader(jar_urls):
     new_classloader = gateway.jvm.java.net.URLClassLoader(
         to_jarray(gateway.jvm.java.net.URL, j_urls), context_classloader)
     gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
+
+
+def to_j_explain_detail_arr(p_extra_details):
+    # sphinx will check "import loop" when generating doc,
+    # use local import to avoid above error
+    from pyflink.table.explain_detail import ExplainDetail
+    gateway = get_gateway()
+
+    def to_j_explain_detail(p_extra_detail):
+        if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS:
+            return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS
+        else:
+            return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
+
+    _len = len(p_extra_details) if p_extra_details else 0
+    j_arr = gateway.new_array(gateway.jvm.org.apache.flink.table.api.ExplainDetail, _len)
+    for i in range(0, _len):
+        j_arr[i] = to_j_explain_detail(p_extra_details[i])
+
+    return j_arr
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
similarity index 50%
copy from flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 92f50ff..6e9d014 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -16,38 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.utils;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.operations.ModifyOperation;
-import org.apache.flink.table.operations.Operation;
-
-import java.util.List;
+package org.apache.flink.table.api;
 
 /**
- * Mocking {@link Planner} for tests.
+ * ExplainDetail defines the types of details for explain result.
  */
-public class PlannerMock implements Planner {
-
-	@Override
-	public Parser getParser() {
-		return new ParserMock();
-	}
-
-	@Override
-	public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
-		return null;
-	}
-
-	@Override
-	public String explain(List<Operation> operations, boolean extended) {
-		return null;
-	}
-
-	@Override
-	public String[] getCompletionHints(String statement, int position) {
-		return new String[0];
-	}
+public enum ExplainDetail {
+	/**
+	 * The cost information on physical rel node estimated by optimizer.
+	 * e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
+	 */
+	ESTIMATED_COST,
+
+	/**
+	 * The changelog traits produced by a physical rel node.
+	 * e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+	 */
+	CHANGELOG_TRAITS
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index d855b21..12d21ec 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -828,7 +828,7 @@ public interface TableEnvironment {
 	 * the result of the given {@link Table}.
 	 *
 	 * @param table The table for which the AST and execution plan will be returned.
-	 * @param extended if the plan should contain additional properties such as
+	 * @param extended if the plan should contain additional properties,
 	 * e.g. estimated cost, traits
 	 */
 	String explain(Table table, boolean extended);
@@ -837,12 +837,23 @@ public interface TableEnvironment {
 	 * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
 	 * the result of multiple-sinks plan.
 	 *
-	 * @param extended if the plan should contain additional properties such as
+	 * @param extended if the plan should contain additional properties,
 	 * e.g. estimated cost, traits
 	 */
 	String explain(boolean extended);
 
 	/**
+	 * Returns the AST of the specified statement and the execution plan to compute
+	 * the result of the given statement.
+	 *
+	 * @param statement The statement for which the AST and execution plan will be returned.
+	 * @param extraDetails The extra explain details which the explain result should include,
+	 *   e.g. estimated cost, change log trait for streaming
+	 * @return AST and the execution plan.
+	 */
+	String explainSql(String statement, ExplainDetail... extraDetails);
+
+	/**
 	 * Returns completion hints for the given statement at the given cursor position.
 	 * The completion happens case insensitively.
 	 *
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1ca045b..610627c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.SqlParserException;
 import org.apache.flink.table.api.Table;
@@ -136,6 +137,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	protected final FunctionCatalog functionCatalog;
 	protected final Planner planner;
 	protected final Parser parser;
+	private final boolean isStreamingMode;
 	private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
 			"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
 			"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
@@ -179,6 +181,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		this.functionCatalog = functionCatalog;
 		this.planner = planner;
 		this.parser = planner.getParser();
+		this.isStreamingMode = isStreamingMode;
 		this.operationTreeBuilder = OperationTreeBuilder.create(
 			tableConfig,
 			functionCatalog.asLookup(parser::parseIdentifier),
@@ -589,14 +592,25 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 
 	@Override
 	public String explain(Table table, boolean extended) {
-		return planner.explain(Collections.singletonList(table.getQueryOperation()), extended);
+		return planner.explain(Collections.singletonList(table.getQueryOperation()), getExplainDetails(extended));
 	}
 
 	@Override
 	public String explain(boolean extended) {
 		List<Operation> operations = bufferedModifyOperations.stream()
-			.map(o -> (Operation) o).collect(Collectors.toList());
-		return planner.explain(operations, extended);
+				.map(o -> (Operation) o).collect(Collectors.toList());
+		return planner.explain(operations, getExplainDetails(extended));
+	}
+
+	@Override
+	public String explainSql(String statement, ExplainDetail... extraDetails) {
+		List<Operation> operations = parser.parse(statement);
+
+		if (operations.size() != 1) {
+			throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
+		}
+
+		return planner.explain(operations, extraDetails);
 	}
 
 	@Override
@@ -854,7 +868,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		} else if (operation instanceof ShowViewsOperation) {
 			return buildShowResult(listViews());
 		} else if (operation instanceof ExplainOperation) {
-			String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false);
+			String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()));
 			return TableResultImpl.builder()
 					.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
 					.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
@@ -979,6 +993,19 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		bufferedModifyOperations.addAll(modifyOperations);
 	}
 
+	@VisibleForTesting
+	protected ExplainDetail[] getExplainDetails(boolean extended) {
+		if (extended) {
+			if (isStreamingMode) {
+				return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS };
+			} else {
+				return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST };
+			}
+		} else {
+			return new ExplainDetail[0];
+		}
+	}
+
 	private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
 		validateTableSource(tableSource);
 		ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(name));
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index 5bb9266..d926e3a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
@@ -79,10 +80,10 @@ public interface Planner {
 	 *
 	 * @param operations The collection of relational queries for which the AST
 	 * and execution plan will be returned.
-	 * @param extended if the plan should contain additional properties such as
-	 * e.g. estimated cost, traits
+	 * @param extraDetails The extra explain details which the explain result should include,
+	 *   e.g. estimated cost, change log trait for streaming
 	 */
-	String explain(List<Operation> operations, boolean extended);
+	String explain(List<Operation> operations, ExplainDetail... extraDetails);
 
 	/**
 	 * Returns completion hints for the given statement at the given cursor position.
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index 92f50ff..42b5403 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.utils;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.operations.ModifyOperation;
@@ -42,7 +43,7 @@ public class PlannerMock implements Planner {
 	}
 
 	@Override
-	public String explain(List<Operation> operations, boolean extended) {
+	public String explain(List<Operation> operations, ExplainDetail... extraDetails) {
 		return null;
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 9161753..f97e015 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
@@ -78,7 +78,7 @@ class BatchPlanner(
     }
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): String = {
+  override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
@@ -122,10 +122,10 @@ class BatchPlanner(
 
     sb.append("== Optimized Logical Plan ==")
     sb.append(System.lineSeparator)
-    val explainLevel = if (extended) {
+    val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
       SqlExplainLevel.ALL_ATTRIBUTES
     } else {
-      SqlExplainLevel.DIGEST_ATTRIBUTES
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES
     }
     sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel))
     sb.append(System.lineSeparator)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 7006533..959de06 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
@@ -69,7 +69,7 @@ class StreamPlanner(
     }
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): String = {
+  override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
@@ -109,11 +109,12 @@ class StreamPlanner(
 
     sb.append("== Optimized Logical Plan ==")
     sb.append(System.lineSeparator)
-    val (explainLevel, withChangelogTraits) = if (extended) {
-      (SqlExplainLevel.ALL_ATTRIBUTES, true)
+    val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
+      SqlExplainLevel.ALL_ATTRIBUTES
     } else {
-      (SqlExplainLevel.DIGEST_ATTRIBUTES, false)
+      SqlExplainLevel.DIGEST_ATTRIBUTES
     }
+    val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS)
     sb.append(ExecNodePlanDumper.dagToString(
       execNodes,
       explainLevel,
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
new file mode 100644
index 0000000..870269f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : Source: Custom Source
+
+	 : Operator
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(select=[a, b], where=[(a > 10)])
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : SinkConversionToRow
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: Unnamed
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
new file mode 100644
index 0000000..0c87ae3
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : Source: Custom Source
+
+	 : Operator
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(select=[a, b, c], where=[(a > 10)])
+			ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index a27b47a..0e197ba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -894,6 +894,63 @@ class TableEnvironmentTest {
     testUnsupportedExplain("explain plan as json for select * from MyTable")
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = tableEnv.explainSql(
+      "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS)
+    val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithInsert.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(explain: String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 4cd0f5d..2d50f43 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1091,7 +1091,7 @@ class TestingTableEnvironment private(
   }
 
   override def explain(extended: Boolean): String = {
-    planner.explain(bufferedOperations.toList, extended)
+    planner.explain(bufferedOperations.toList, getExplainDetails(extended): _*)
   }
 
   @throws[Exception]
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index efc38a5..b3caf20 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -217,16 +217,20 @@ abstract class BatchTableEnvImpl(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-    explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended)
+    explain(
+      JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
+      getExplainDetails(extended): _*)
   }
 
   override def explain(table: Table): String = explain(table: Table, extended = false)
 
   override def explain(extended: Boolean): String = {
-    explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
+    explain(
+      bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava,
+      getExplainDetails(extended): _*)
   }
 
-   protected def explain(operations: JList[Operation], extended: Boolean): String = {
+  protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astList = operations.asScala.map {
       case queryOperation: QueryOperation =>
@@ -285,6 +289,8 @@ abstract class BatchTableEnvImpl(
 
     val env = dataSinks.head.getDataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
+    // keep the behavior as before
+    val extended = extraDetails.contains(ExplainDetail.ESTIMATED_COST)
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
 
     s"== Abstract Syntax Tree ==" +
@@ -597,6 +603,14 @@ abstract class BatchTableEnvImpl(
     TableSchema.builder().fields(originalNames, fieldTypes).build()
   }
 
+  private def getExplainDetails(extended: Boolean): Array[ExplainDetail] = {
+    if (extended) {
+      Array(ExplainDetail.ESTIMATED_COST)
+    } else {
+      Array.empty
+    }
+  }
+
   protected def createDummyBatchTableEnv(): BatchTableEnvImpl
 
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 7c6f144..1f01186 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -762,14 +762,13 @@ abstract class TableEnvImpl(
       case _: ShowViewsOperation =>
         buildShowResult(listViews())
       case explainOperation: ExplainOperation =>
-        val explanation = explain(
-          JCollections.singletonList(explainOperation.getChild),
-          extended = false)
+        val explanation = explain(JCollections.singletonList(explainOperation.getChild))
         TableResultImpl.builder.
           resultKind(ResultKind.SUCCESS_WITH_CONTENT)
           .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
           .data(JCollections.singletonList(Row.of(explanation)))
           .build
+
       case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
     }
   }
@@ -1135,7 +1134,18 @@ abstract class TableEnvImpl(
     }
   }
 
-  protected def explain(operations: JList[Operation], extended: Boolean): String
+  override def explainSql(statement: String, extraDetails: ExplainDetail*): String = {
+    val operations = parser.parse(statement)
+
+    if (operations.size != 1) {
+      throw new TableException(
+        "Unsupported SQL query! explainSql() only accepts a single SQL query.")
+    }
+
+    explain(operations, extraDetails: _*)
+  }
+
+  protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String
 
   override def fromValues(values: Expression*): Table = {
     createTable(operationTreeBuilder.values(values: _*))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 756d9ca..d81ca1c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -120,7 +120,7 @@ class StreamPlanner(
     }.filter(Objects.nonNull).asJava
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): String = {
+  override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astWithUpdatesAsRetractionTuples = operations.asScala.map {
       case queryOperation: QueryOperation =>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index d09bd5d..c928314 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -24,8 +24,7 @@ import org.apache.flink.table.api.{ResultKind, TableException}
 import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
 import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF}
 import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId, _}
 import org.apache.flink.types.Row
 
 import org.hamcrest.Matchers.containsString
@@ -447,6 +446,64 @@ class BatchTableEnvironmentTest extends TableTestBase {
       "explain plan as json for select * from MyTable")
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithSelect1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = util.tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithInsert1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 439fadb..25bb536 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -326,6 +326,64 @@ class StreamTableEnvironmentTest extends TableTestBase {
     }
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithSelect0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = util.tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithInsert0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def prepareSchemaExpressionParser:
     (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 8a9d9c4..312d980 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.utils
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableResult}
+import org.apache.flink.table.api.{ExplainDetail, Table, TableConfig, TableEnvironment, TableResult}
 import org.apache.flink.table.catalog.Catalog
 import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.expressions.Expression
@@ -74,6 +74,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def explain(extended: Boolean): String = ???
 
+  override def explainSql(statement: String, extraDetails: ExplainDetail*): String = ???
+
   override def getCompletionHints(statement: String, position: Int): Array[String] = ???
 
   override def sqlQuery(query: String): Table = ???
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
new file mode 100644
index 0000000..bbd0f53
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  LogicalProject(a=[$0], b=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  DataStreamCalc(select=[a, b], where=[>(a, 10)])
+    StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : from: (a, b)
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : where: (>(a, 10)), select: (a, b)
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : to: Row
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: Unnamed
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
new file mode 100644
index 0000000..b904056
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
@@ -0,0 +1,43 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+  LogicalProject(a=[$0], b=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+  DataSetCalc(select=[a, b], where=[>(a, 10)])
+    BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : from: (a, b)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : FlatMap
+			content : where: (>(a, 10)), select: (a, b)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			driver_strategy : FlatMap
+			Partitioning : RANDOM_PARTITIONED
+
+			 : Map
+				content : to: Row
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				driver_strategy : Map
+				Partitioning : RANDOM_PARTITIONED
+
+				 : Data Sink
+					content : org.apache.flink.api.java.io.LocalCollectionOutputFormat
+					ship_strategy : Forward
+					exchange_mode : PIPELINED
+					Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+  StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : Map
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : where: (>(a, 10)), select: (a, b, c)
+			ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+  BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : FlatMap
+		content : where: (>(a, 10)), select: (a, b, c)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : FlatMap
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : org.apache.flink.api.java.io.DiscardingOutputFormat
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+