You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/08 12:57:30 UTC

[flink] 03/05: [FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6013a62523bd2fc73b6aa7b3109f6b7c138ac51c
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Apr 22 17:08:00 2020 +0800

    [FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement
---
 .../table/api/internal/TableEnvironmentImpl.java   |   9 ++
 .../flink/table/operations/ExplainOperation.java   |  46 ++++++++
 .../operations/SqlToOperationConverter.java        |  19 ++++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  11 +-
 .../table/planner/delegation/BatchPlanner.scala    |  22 +++-
 .../table/planner/delegation/StreamPlanner.scala   |  22 +++-
 .../explain/testExecuteSqlWithExplainInsert.out    |  31 +++++
 .../explain/testExecuteSqlWithExplainSelect.out    |  21 ++++
 .../flink/table/api/TableEnvironmentTest.scala     | 115 ++++++++++++++++++-
 .../table/sqlexec/SqlToOperationConverter.java     |  19 ++++
 .../table/api/internal/BatchTableEnvImpl.scala     |  71 ++++++++----
 .../flink/table/api/internal/TableEnvImpl.scala    |  13 ++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |  11 +-
 .../apache/flink/table/planner/StreamPlanner.scala |  17 ++-
 .../flink/table/api/TableEnvironmentITCase.scala   |  13 +--
 .../api/batch/BatchTableEnvironmentTest.scala      | 120 +++++++++++++++++++-
 .../api/stream/StreamTableEnvironmentTest.scala    | 126 ++++++++++++++++++++-
 .../resources/testExecuteSqlWithExplainInsert0.out |  31 +++++
 .../resources/testExecuteSqlWithExplainInsert1.out |  36 ++++++
 .../resources/testExecuteSqlWithExplainSelect0.out |  21 ++++
 .../resources/testExecuteSqlWithExplainSelect1.out |  27 +++++
 21 files changed, 744 insertions(+), 57 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 342ee55..1ca045b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -72,6 +72,7 @@ import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
@@ -852,6 +853,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 			return buildShowResult(listFunctions());
 		} else if (operation instanceof ShowViewsOperation) {
 			return buildShowResult(listViews());
+		} else if (operation instanceof ExplainOperation) {
+			String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false);
+			return TableResultImpl.builder()
+					.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+					.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
+					.data(Collections.singletonList(Row.of(explanation)))
+					.build();
+
 		} else {
 			throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
 		}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
new file mode 100644
index 0000000..c78c78f
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import java.util.Collections;
+
+/**
+ * Operation to describe an EXPLAIN statement.
+ * NOTES: currently, only default behavior(EXPLAIN PLAN FOR xx) is supported.
+ */
+public class ExplainOperation implements Operation {
+	private final Operation child;
+
+	public ExplainOperation(Operation child) {
+		this.child = child;
+	}
+
+	public Operation getChild() {
+		return child;
+	}
+
+	@Override
+	public String asSummaryString() {
+		return OperationUtils.formatWithChildren(
+				"EXPLAIN PLAN FOR",
+				Collections.emptyMap(),
+				Collections.singletonList(child),
+				Operation::asSummaryString);
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index b303570..9f80aa1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -62,6 +62,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
 import org.apache.flink.table.operations.ShowDatabasesOperation;
@@ -98,6 +99,9 @@ import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
@@ -195,6 +199,8 @@ public class SqlToOperationConverter {
 			return Optional.of(converter.convertDropView((SqlDropView) validated));
 		} else if (validated instanceof SqlShowViews) {
 			return Optional.of(converter.convertShowViews((SqlShowViews) validated));
+		} else if (validated instanceof SqlExplain) {
+			return Optional.of(converter.convertExplain((SqlExplain) validated));
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return Optional.of(converter.convertSqlQuery(validated));
 		} else {
@@ -577,6 +583,19 @@ public class SqlToOperationConverter {
 		return new ShowViewsOperation();
 	}
 
+	/** Convert EXPLAIN statement. */
+	private Operation convertExplain(SqlExplain sqlExplain) {
+		Operation operation = convertSqlQuery(sqlExplain.getExplicandum());
+
+		if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES ||
+				sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL ||
+				sqlExplain.getFormat() != SqlExplainFormat.TEXT) {
+			throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx");
+		}
+
+		return new ExplainOperation(operation);
+	}
+
 	/** Fallback method for sql query. */
 	private Operation convertSqlQuery(SqlNode node) {
 		return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index c3f72b3..846f536 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -30,7 +30,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
-import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
 import java.lang.{Boolean => JBoolean}
@@ -129,7 +129,14 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowViews]) {
         return sqlNode
       }
-      validator.validate(sqlNode)
+      sqlNode match {
+        case explain: SqlExplain =>
+          val validated = validator.validate(explain.getExplicandum)
+          explain.setOperand(0, validated)
+          explain
+        case _ =>
+          validator.validate(sqlNode)
+      }
     }
     catch {
       case e: RuntimeException =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 2626809..9161753 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.planner.operations.PlannerQueryOperation
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
@@ -32,6 +33,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp
 import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
 
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
+import org.apache.calcite.rel.logical.LogicalTableModify
 import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
 import org.apache.calcite.sql.SqlExplainLevel
 
@@ -80,7 +82,21 @@ class BatchPlanner(
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
-        getRelBuilder.queryOperation(queryOperation).build()
+        val relNode = getRelBuilder.queryOperation(queryOperation).build()
+        relNode match {
+          // SQL: explain plan for insert into xx
+          case modify: LogicalTableModify =>
+            // convert LogicalTableModify to CatalogSinkModifyOperation
+            val qualifiedName = modify.getTable.getQualifiedName
+            require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+            val modifyOperation = new CatalogSinkModifyOperation(
+              ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+              new PlannerQueryOperation(modify.getInput)
+            )
+            translateToRel(modifyOperation)
+          case _ =>
+            relNode
+        }
       case modifyOperation: ModifyOperation =>
         translateToRel(modifyOperation)
       case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index e6245f2..7006533 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.planner.operations.PlannerQueryOperation
 import org.apache.flink.table.planner.plan.`trait`._
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer}
@@ -30,6 +31,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp
 import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
 
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
+import org.apache.calcite.rel.logical.LogicalTableModify
 import org.apache.calcite.sql.SqlExplainLevel
 
 import java.util
@@ -71,7 +73,21 @@ class StreamPlanner(
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
-        getRelBuilder.queryOperation(queryOperation).build()
+        val relNode = getRelBuilder.queryOperation(queryOperation).build()
+        relNode match {
+          // SQL: explain plan for insert into xx
+          case modify: LogicalTableModify =>
+            // convert LogicalTableModify to CatalogSinkModifyOperation
+            val qualifiedName = modify.getTable.getQualifiedName
+            require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+            val modifyOperation = new CatalogSinkModifyOperation(
+              ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+              new PlannerQueryOperation(modify.getInput)
+            )
+            translateToRel(modifyOperation)
+          case _ =>
+            relNode
+        }
       case modifyOperation: ModifyOperation =>
         translateToRel(modifyOperation)
       case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
new file mode 100644
index 0000000..0e5e015
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- LogicalProject(d=[$0], e=[$1])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : Source: Custom Source
+
+	 : Operator
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(select=[a, b], where=[(a > 10)])
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : SinkConversionToRow
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: Unnamed
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
new file mode 100644
index 0000000..4865193
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : Source: Custom Source
+
+	 : Operator
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(select=[a, b, c], where=[(a > 10)])
+			ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index e289edb..a27b47a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.api
 
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.sql.SqlExplainLevel
 import org.apache.flink.api.common.typeinfo.Types.STRING
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
@@ -29,10 +27,14 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
 import org.apache.flink.table.planner.operations.SqlConversionException
 import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF
 import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction
+import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
 import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks}
 import org.apache.flink.types.Row
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.sql.SqlExplainLevel
 import org.hamcrest.Matchers.containsString
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{Rule, Test}
 
@@ -797,6 +799,113 @@ class TableEnvironmentTest {
       tableResult5.collect())
   }
 
+  @Test
+  def testExecuteSqlWithExplainSelect(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val tableResult2 = tableEnv.executeSql("explain plan for select * from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    val it = tableResult2.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithExplainInsert(): Unit = {
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val tableResult3 = tableEnv.executeSql(
+      "explain plan for insert into MySink select a, b from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    val it = tableResult3.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithUnsupportedExplain(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    // TODO we can support them later
+    testUnsupportedExplain("explain plan excluding attributes for select * from MyTable")
+    testUnsupportedExplain("explain plan including all attributes for select * from MyTable")
+    testUnsupportedExplain("explain plan with type for select * from MyTable")
+    testUnsupportedExplain("explain plan without implementation for select * from MyTable")
+    testUnsupportedExplain("explain plan as xml for select * from MyTable")
+    testUnsupportedExplain("explain plan as json for select * from MyTable")
+  }
+
+  private def testUnsupportedExplain(explain: String): Unit = {
+    try {
+      tableEnv.executeSql(explain)
+      fail("This should not happen")
+    } catch {
+      case e: TableException =>
+        assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+      case e =>
+        fail("This should not happen, " + e.getMessage)
+    }
+  }
+
   private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
       assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 8ebb47e..ded0d85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -61,6 +61,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -91,6 +92,9 @@ import org.apache.flink.util.StringUtils;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
@@ -187,6 +191,8 @@ public class SqlToOperationConverter {
 			return Optional.of(converter.convertDropView((SqlDropView) validated));
 		} else if (validated instanceof SqlShowViews) {
 			return Optional.of(converter.convertShowViews((SqlShowViews) validated));
+		} else if (validated instanceof SqlExplain) {
+			return Optional.of(converter.convertExplain((SqlExplain) validated));
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return Optional.of(converter.convertSqlQuery(validated));
 		} else {
@@ -534,6 +540,19 @@ public class SqlToOperationConverter {
 		return new ShowViewsOperation();
 	}
 
+	/** Convert EXPLAIN statement. */
+	private Operation convertExplain(SqlExplain sqlExplain) {
+		Operation operation = convertSqlQuery(sqlExplain.getExplicandum());
+
+		if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES ||
+				sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL ||
+				sqlExplain.getFormat() != SqlExplainFormat.TEXT) {
+			throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx");
+		}
+
+		return new ExplainOperation(operation);
+	}
+
 	/**
 	 * Create a table schema from {@link SqlCreateTable}. This schema contains computed column
 	 * fields, say, we have a create table DDL statement:
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index e25b8e4..efc38a5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -31,14 +31,14 @@ import org.apache.flink.configuration.DeploymentOptions
 import org.apache.flink.core.execution.{DetachedJobExecutionResult, JobClient}
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory}
-import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager}
+import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager, ObjectIdentifier}
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
 import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
 import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, PlannerQueryOperation, QueryOperation}
 import org.apache.flink.table.plan.BatchOptimizer
 import org.apache.flink.table.plan.nodes.LogicalSink
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
@@ -57,6 +57,7 @@ import org.apache.flink.util.Preconditions.checkNotNull
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalTableModify
 
 import _root_.java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}
 
@@ -225,11 +226,25 @@ abstract class BatchTableEnvImpl(
     explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
   }
 
-  private def explain(operations: JList[Operation], extended: Boolean): String = {
+   protected def explain(operations: JList[Operation], extended: Boolean): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astList = operations.asScala.map {
       case queryOperation: QueryOperation =>
-        getRelBuilder.tableOperation(queryOperation).build()
+        val relNode = getRelBuilder.tableOperation(queryOperation).build()
+        relNode match {
+          // SQL: explain plan for insert into xx
+          case modify: LogicalTableModify =>
+            // convert LogicalTableModify to CatalogSinkModifyOperation
+            val qualifiedName = modify.getTable.getQualifiedName
+            require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+            val modifyOperation = new CatalogSinkModifyOperation(
+              ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+              new PlannerQueryOperation(modify.getInput)
+            )
+            translateToRel(modifyOperation, addLogicalSink = true)
+          case _ =>
+            relNode
+        }
       case modifyOperation: ModifyOperation =>
         translateToRel(modifyOperation, addLogicalSink = true)
       case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}")
@@ -237,27 +252,33 @@ abstract class BatchTableEnvImpl(
 
     val optimizedNodes = astList.map(optimizer.optimize)
 
-    val batchTableEnv = createDummyBatchTableEnv()
-    val dataSinks = optimizedNodes.zip(operations.asScala).map {
-      case (optimizedNode, operation) =>
-        operation match {
-          case queryOperation: QueryOperation =>
-            val dataSet = translate[Row](
-              optimizedNode,
-              getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))(
-              new GenericTypeInfo(classOf[Row]))
-            dataSet.output(new DiscardingOutputFormat[Row])
-          case modifyOperation: ModifyOperation =>
-            val tableSink = getTableSink(modifyOperation)
-            translate(
-              batchTableEnv,
-              optimizedNode,
-              tableSink,
-              getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
-          case o =>
-            throw new TableException("Unsupported Operation: " + o.asSummaryString())
-        }
-    }
+     val batchTableEnv = createDummyBatchTableEnv()
+     val dataSinks = optimizedNodes.zip(operations.asScala).map {
+       case (optimizedNode, operation) =>
+         operation match {
+           case queryOperation: QueryOperation =>
+             val fieldNames = queryOperation match {
+               case o: PlannerQueryOperation if o.getCalciteTree.isInstanceOf[LogicalTableModify] =>
+                 o.getCalciteTree.getInput(0).getRowType.getFieldNames.asScala.toArray[String]
+               case _ =>
+                 queryOperation.getTableSchema.getFieldNames
+             }
+             val dataSet = translate[Row](
+               optimizedNode,
+               getTableSchema(fieldNames, optimizedNode))(
+               new GenericTypeInfo(classOf[Row]))
+             dataSet.output(new DiscardingOutputFormat[Row])
+           case modifyOperation: ModifyOperation =>
+             val tableSink = getTableSink(modifyOperation)
+             translate(
+               batchTableEnv,
+               optimizedNode,
+               tableSink,
+               getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+           case o =>
+             throw new TableException("Unsupported Operation: " + o.asSummaryString())
+         }
+     }
 
     val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator)
     val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3a97106..7c6f144 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -545,8 +545,6 @@ abstract class TableEnvImpl(
 
   override def listFunctions(): Array[String] = functionCatalog.getFunctions
 
-  override def explain(table: Table): String
-
   override def getCompletionHints(statement: String, position: Int): Array[String] = {
     val planner = getFlinkPlanner
     planner.getCompletionHints(statement, position)
@@ -763,6 +761,15 @@ abstract class TableEnvImpl(
         TableResultImpl.TABLE_RESULT_OK
       case _: ShowViewsOperation =>
         buildShowResult(listViews())
+      case explainOperation: ExplainOperation =>
+        val explanation = explain(
+          JCollections.singletonList(explainOperation.getChild),
+          extended = false)
+        TableResultImpl.builder.
+          resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+          .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
+          .data(JCollections.singletonList(Row.of(explanation)))
+          .build
       case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
     }
   }
@@ -1128,6 +1135,8 @@ abstract class TableEnvImpl(
     }
   }
 
+  protected def explain(operations: JList[Operation], extended: Boolean): String
+
   override def fromValues(values: Expression*): Table = {
     createTable(operationTreeBuilder.values(values: _*))
   }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index b4c6210..3d7dae4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.rel.RelRoot
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.RexBuilder
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
-import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
 import _root_.java.lang.{Boolean => JBoolean}
@@ -127,7 +127,14 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowViews]) {
         return sqlNode
       }
-      validator.validate(sqlNode)
+      sqlNode match {
+        case explain: SqlExplain =>
+          val validated = validator.validate(explain.getExplicandum)
+          explain.setOperand(0, validated)
+          explain
+        case _ =>
+          validator.validate(sqlNode)
+      }
     }
     catch {
       case e: RuntimeException =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index f549168..756d9ca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -43,6 +43,7 @@ import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalTableModify
 
 import _root_.java.util
 import _root_.java.util.Objects
@@ -123,7 +124,21 @@ class StreamPlanner(
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astWithUpdatesAsRetractionTuples = operations.asScala.map {
       case queryOperation: QueryOperation =>
-        (getRelBuilder.tableOperation(queryOperation).build(), false)
+        val relNode = getRelBuilder.tableOperation(queryOperation).build()
+        relNode match {
+          // SQL: explain plan for insert into xx
+          case modify: LogicalTableModify =>
+            // convert LogicalTableModify to CatalogSinkModifyOperation
+            val qualifiedName = modify.getTable.getQualifiedName
+            require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+            val modifyOperation = new CatalogSinkModifyOperation(
+              ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+              new PlannerQueryOperation(modify.getInput)
+            )
+            translateToRel(modifyOperation)
+          case _ =>
+            (relNode, false)
+        }
       case modifyOperation: ModifyOperation =>
         translateToRel(modifyOperation)
       case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index c5e90bf..dd83a3e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -24,13 +24,14 @@ import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecutionEnvironment}
-import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData, readFromResource, replaceStageId}
+import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData}
 import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.api.java.StreamTableEnvironment
 import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnvironment, _}
 import org.apache.flink.table.runtime.utils.StreamITCase
 import org.apache.flink.table.sinks.CsvTableSink
 import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
 import org.apache.flink.table.utils.TestingOverwritableTableSink
 import org.apache.flink.types.Row
 import org.apache.flink.util.FileUtils
@@ -46,7 +47,6 @@ import _root_.java.lang.{Long => JLong}
 import _root_.java.util
 
 import _root_.scala.collection.mutable
-import _root_.scala.io.Source
 
 
 @RunWith(classOf[Parameterized])
@@ -452,15 +452,6 @@ object TableEnvironmentITCase {
     )
   }
 
-  def readFromResource(file: String): String = {
-    val source = s"${getClass.getResource("/").getFile}../../src/test/scala/resources/$file"
-    Source.fromFile(source).mkString
-  }
-
-  def replaceStageId(s: String): String = {
-    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
-  }
-
   def getPersonCsvTableSource: CsvTableSource = {
     val header = "First#Id#Score#Last"
     val csvRecords = getPersonData.map {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index 9823e08..d09bd5d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -25,10 +25,11 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
 import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF}
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
 import org.apache.flink.types.Row
 
 import org.hamcrest.Matchers.containsString
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.Test
 
 import java.util
@@ -341,6 +342,123 @@ class BatchTableEnvironmentTest extends TableTestBase {
       tableResult4.collect())
   }
 
+  @Test
+  def testExecuteSqlWithExplainSelect(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val tableResult2 = util.tableEnv.executeSql(
+      "explain plan for select * from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    val it = tableResult2.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = readFromResource("testExecuteSqlWithExplainSelect1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithExplainInsert(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val tableResult3 = util.tableEnv.executeSql(
+      "explain plan for insert into MySink select a, b from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    val it = tableResult3.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = readFromResource("testExecuteSqlWithExplainInsert1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithUnsupportedExplain(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    // TODO we can support them later
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan excluding attributes for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan including all attributes for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan with type for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan without implementation for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan as xml for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan as json for select * from MyTable")
+  }
+
+  private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
+    try {
+      tableEnv.executeSql(explain)
+      fail("This should not happen")
+    } catch {
+      case e: TableException =>
+        assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+      case e =>
+        fail("This should not happen, " + e.getMessage)
+    }
+  }
+
   private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
       assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 809d0aa..439fadb 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -29,20 +29,21 @@ import org.apache.flink.table.api.Expressions.$
 import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => JStreamTableEnvironmentImpl}
 import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{EnvironmentSettings, Expressions, TableConfig, Types, ValidationException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.api.{EnvironmentSettings, ResultKind, TableConfig, TableException, Types, ValidationException}
+import org.apache.flink.table.catalog.FunctionCatalog
 import org.apache.flink.table.executor.StreamExecutor
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.TableTestUtil.{binaryNode, readFromResource, replaceStageId, streamTableNode, term, unaryNode}
 import org.apache.flink.table.utils.{CatalogManagerMocks, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
 import org.apache.flink.types.Row
 
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
 import org.junit.Test
 import org.mockito.Mockito.{mock, when}
 
 import java.lang.{Integer => JInt, Long => JLong}
-import org.apache.flink.table.module.ModuleManager
 
 class StreamTableEnvironmentTest extends TableTestBase {
 
@@ -208,6 +209,123 @@ class StreamTableEnvironmentTest extends TableTestBase {
     jTEnv.fromDataStream(ds, $("rt").rowtime(), $("b"), $("c"), $("d"), $("e"), $("pt").proctime())
   }
 
+  @Test
+  def testExecuteSqlWithExplainSelect(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val tableResult2 = util.tableEnv.executeSql(
+      "explain plan for select * from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    val it = tableResult2.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = readFromResource("testExecuteSqlWithExplainSelect0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithExplainInsert(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val tableResult3 = util.tableEnv.executeSql(
+      "explain plan for insert into MySink select a, b from MyTable where a > 10")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    val it = tableResult3.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = row.getField(0).toString
+    val expected = readFromResource("testExecuteSqlWithExplainInsert0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
+
+  @Test
+  def testExecuteSqlWithUnsupportedExplain(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    // TODO we can support them later
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan excluding attributes for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan including all attributes for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan with type for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan without implementation for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan as xml for select * from MyTable")
+    testUnsupportedExplain(util.tableEnv,
+      "explain plan as json for select * from MyTable")
+  }
+
+  private def testUnsupportedExplain(tableEnv: StreamTableEnvironment, explain: String): Unit = {
+    try {
+      tableEnv.executeSql(explain)
+      fail("This should not happen")
+    } catch {
+      case e: TableException =>
+        assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+      case e =>
+        fail("This should not happen, " + e.getMessage)
+    }
+  }
+
   private def prepareSchemaExpressionParser:
     (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
new file mode 100644
index 0000000..aad6387
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  LogicalProject(d=[$0], e=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  DataStreamCalc(select=[a, b], where=[>(a, 10)])
+    StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : from: (a, b)
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : where: (>(a, 10)), select: (a, b)
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : to: Row
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: Unnamed
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
new file mode 100644
index 0000000..98206ae
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
@@ -0,0 +1,36 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+  LogicalProject(d=[$0], e=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+  DataSetCalc(select=[a, b], where=[>(a, 10)])
+    BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : from: (a, b)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : FlatMap
+			content : where: (>(a, 10)), select: (a, b)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			driver_strategy : FlatMap
+			Partitioning : RANDOM_PARTITIONED
+
+			 : Data Sink
+				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+  StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : Map
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : where: (>(a, 10)), select: (a, b, c)
+			ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+  BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : FlatMap
+		content : where: (>(a, 10)), select: (a, b, c)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : FlatMap
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : org.apache.flink.api.java.io.DiscardingOutputFormat
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+