You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/08 12:57:31 UTC

[flink] 04/05: [FLINK-17267] [table] Introduce TableEnvironment#explainSql api

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a5fc5f28a1c3569da8de953f6cd1fad5371f6a4
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Apr 29 10:57:32 2020 +0800

    [FLINK-17267] [table] Introduce TableEnvironment#explainSql api
---
 flink-python/pyflink/table/__init__.py             |  4 +-
 flink-python/pyflink/table/explain_detail.py       | 34 ++++++++++++
 flink-python/pyflink/table/table_environment.py    | 20 ++++++-
 .../table/tests/test_table_environment_api.py      | 29 +++++++++-
 flink-python/pyflink/util/utils.py                 | 20 +++++++
 .../org/apache/flink/table/api/ExplainDetail.java} | 45 +++++-----------
 .../apache/flink/table/api/TableEnvironment.java   | 15 +++++-
 .../table/api/internal/TableEnvironmentImpl.java   | 35 +++++++++++--
 .../org/apache/flink/table/delegation/Planner.java |  7 +--
 .../org/apache/flink/table/utils/PlannerMock.java  |  3 +-
 .../table/planner/delegation/BatchPlanner.scala    |  8 +--
 .../table/planner/delegation/StreamPlanner.scala   | 11 ++--
 .../resources/explain/testExplainSqlWithInsert.out | 31 +++++++++++
 .../resources/explain/testExplainSqlWithSelect.out | 21 ++++++++
 .../flink/table/api/TableEnvironmentTest.scala     | 57 ++++++++++++++++++++
 .../flink/table/planner/utils/TableTestBase.scala  |  2 +-
 .../table/api/internal/BatchTableEnvImpl.scala     | 20 +++++--
 .../flink/table/api/internal/TableEnvImpl.scala    | 18 +++++--
 .../apache/flink/table/planner/StreamPlanner.scala |  2 +-
 .../api/batch/BatchTableEnvironmentTest.scala      | 61 +++++++++++++++++++++-
 .../api/stream/StreamTableEnvironmentTest.scala    | 58 ++++++++++++++++++++
 .../flink/table/utils/MockTableEnvironment.scala   |  4 +-
 .../scala/resources/testExplainSqlWithInsert0.out  | 31 +++++++++++
 .../scala/resources/testExplainSqlWithInsert1.out  | 43 +++++++++++++++
 .../scala/resources/testExplainSqlWithSelect0.out  | 21 ++++++++
 .../scala/resources/testExplainSqlWithSelect1.out  | 27 ++++++++++
 26 files changed, 562 insertions(+), 65 deletions(-)

diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 140c6b3..1e367f3 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -70,6 +70,7 @@ from pyflink.table.sources import TableSource, CsvTableSource
 from pyflink.table.types import DataTypes, UserDefinedType, Row
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.udf import FunctionContext, ScalarFunction
+from pyflink.table.explain_detail import ExplainDetail
 
 __all__ = [
     'TableEnvironment',
@@ -93,5 +94,6 @@ __all__ = [
     'TableSchema',
     'FunctionContext',
     'ScalarFunction',
-    'SqlDialect'
+    'SqlDialect',
+    'ExplainDetail'
 ]
diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py
new file mode 100644
index 0000000..48e7ce9
--- /dev/null
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -0,0 +1,34 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+__all__ = ['ExplainDetail']
+
+
+class ExplainDetail(object):
+    """
+    ExplainDetail defines the types of details for explain result.
+    """
+
+    # The cost information on physical rel node estimated by optimizer.
+    # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network,
+    # 0.0 memory}
+    ESTIMATED_COST = 0
+
+    # The changelog traits produced by a physical rel node.
+    # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+    CHANGELOG_TRAITS = 1
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index d8e8c51..94ff785 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -36,7 +36,8 @@ from pyflink.table import Table
 from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
     _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema
 from pyflink.util import utils
-from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class
+from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class, \
+    to_j_explain_detail_arr
 
 __all__ = [
     'BatchTableEnvironment',
@@ -468,6 +469,23 @@ class TableEnvironment(object):
         else:
             return self._j_tenv.explain(table._j_table, extended)
 
+    def explain_sql(self, stmt, *extra_details):
+        """
+        Returns the AST of the specified statement and the execution plan to compute
+        the result of the given statement.
+
+        :param stmt: The statement for which the AST and execution plan will be returned.
+        :type stmt: str
+        :param extra_details: The extra explain details which the explain result should include,
+                              e.g. estimated cost, change log trait for streaming
+        :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
+        :return: The statement for which the AST and execution plan will be returned.
+        :rtype: str
+        """
+
+        j_extra_details = to_j_explain_detail_arr(extra_details)
+        return self._j_tenv.explainSql(stmt, j_extra_details)
+
     def sql_query(self, query):
         """
         Evaluates a SQL query on registered tables and retrieves the result as a
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 87c8023..bd279af 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -35,8 +35,8 @@ from pyflink.table.types import RowType
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase, \
     PyFlinkBlinkBatchTableTestCase
-from pyflink.util.exceptions import TableException
 from pyflink.util.utils import get_j_env_configuration
+from pyflink.table.explain_detail import ExplainDetail
 
 
 class TableEnvironmentTest(object):
@@ -242,6 +242,33 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
         actual = t_env.explain(extended=True)
         assert isinstance(actual, str)
 
+    def test_explain_sql_without_explain_detail(self):
+        t_env = self.t_env
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+
+        result = t_env.explain_sql("select a + 1, b, c from %s" % source)
+
+        assert isinstance(result, str)
+
+    def test_explain_sql_with_explain_detail(self):
+        t_env = self.t_env
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+
+        result = t_env.explain_sql(
+            "select a + 1, b, c from %s" % source, ExplainDetail.ESTIMATED_COST)
+
+        assert isinstance(result, str)
+
     def test_create_table_environment(self):
         table_config = TableConfig()
         table_config.set_max_generated_code_length(32000)
diff --git a/flink-python/pyflink/util/utils.py b/flink-python/pyflink/util/utils.py
index 89db742..29a20da 100644
--- a/flink-python/pyflink/util/utils.py
+++ b/flink-python/pyflink/util/utils.py
@@ -125,3 +125,23 @@ def add_jars_to_context_class_loader(jar_urls):
     new_classloader = gateway.jvm.java.net.URLClassLoader(
         to_jarray(gateway.jvm.java.net.URL, j_urls), context_classloader)
     gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
+
+
+def to_j_explain_detail_arr(p_extra_details):
+    # sphinx will check "import loop" when generating doc,
+    # use local import to avoid above error
+    from pyflink.table.explain_detail import ExplainDetail
+    gateway = get_gateway()
+
+    def to_j_explain_detail(p_extra_detail):
+        if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS:
+            return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS
+        else:
+            return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
+
+    _len = len(p_extra_details) if p_extra_details else 0
+    j_arr = gateway.new_array(gateway.jvm.org.apache.flink.table.api.ExplainDetail, _len)
+    for i in range(0, _len):
+        j_arr[i] = to_j_explain_detail(p_extra_details[i])
+
+    return j_arr
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
similarity index 50%
copy from flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 92f50ff..6e9d014 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -16,38 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.utils;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.operations.ModifyOperation;
-import org.apache.flink.table.operations.Operation;
-
-import java.util.List;
+package org.apache.flink.table.api;
 
 /**
- * Mocking {@link Planner} for tests.
+ * ExplainDetail defines the types of details for explain result.
  */
-public class PlannerMock implements Planner {
-
-	@Override
-	public Parser getParser() {
-		return new ParserMock();
-	}
-
-	@Override
-	public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
-		return null;
-	}
-
-	@Override
-	public String explain(List<Operation> operations, boolean extended) {
-		return null;
-	}
-
-	@Override
-	public String[] getCompletionHints(String statement, int position) {
-		return new String[0];
-	}
+public enum ExplainDetail {
+	/**
+	 * The cost information on physical rel node estimated by optimizer.
+	 * e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
+	 */
+	ESTIMATED_COST,
+
+	/**
+	 * The changelog traits produced by a physical rel node.
+	 * e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+	 */
+	CHANGELOG_TRAITS
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index d855b21..12d21ec 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -828,7 +828,7 @@ public interface TableEnvironment {
 	 * the result of the given {@link Table}.
 	 *
 	 * @param table The table for which the AST and execution plan will be returned.
-	 * @param extended if the plan should contain additional properties such as
+	 * @param extended if the plan should contain additional properties,
 	 * e.g. estimated cost, traits
 	 */
 	String explain(Table table, boolean extended);
@@ -837,12 +837,23 @@ public interface TableEnvironment {
 	 * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
 	 * the result of multiple-sinks plan.
 	 *
-	 * @param extended if the plan should contain additional properties such as
+	 * @param extended if the plan should contain additional properties,
 	 * e.g. estimated cost, traits
 	 */
 	String explain(boolean extended);
 
 	/**
+	 * Returns the AST of the specified statement and the execution plan to compute
+	 * the result of the given statement.
+	 *
+	 * @param statement The statement for which the AST and execution plan will be returned.
+	 * @param extraDetails The extra explain details which the explain result should include,
+	 *   e.g. estimated cost, change log trait for streaming
+	 * @return AST and the execution plan.
+	 */
+	String explainSql(String statement, ExplainDetail... extraDetails);
+
+	/**
 	 * Returns completion hints for the given statement at the given cursor position.
 	 * The completion happens case insensitively.
 	 *
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1ca045b..610627c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.SqlParserException;
 import org.apache.flink.table.api.Table;
@@ -136,6 +137,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	protected final FunctionCatalog functionCatalog;
 	protected final Planner planner;
 	protected final Parser parser;
+	private final boolean isStreamingMode;
 	private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
 			"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
 			"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
@@ -179,6 +181,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		this.functionCatalog = functionCatalog;
 		this.planner = planner;
 		this.parser = planner.getParser();
+		this.isStreamingMode = isStreamingMode;
 		this.operationTreeBuilder = OperationTreeBuilder.create(
 			tableConfig,
 			functionCatalog.asLookup(parser::parseIdentifier),
@@ -589,14 +592,25 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 
 	@Override
 	public String explain(Table table, boolean extended) {
-		return planner.explain(Collections.singletonList(table.getQueryOperation()), extended);
+		return planner.explain(Collections.singletonList(table.getQueryOperation()), getExplainDetails(extended));
 	}
 
 	@Override
 	public String explain(boolean extended) {
 		List<Operation> operations = bufferedModifyOperations.stream()
-			.map(o -> (Operation) o).collect(Collectors.toList());
-		return planner.explain(operations, extended);
+				.map(o -> (Operation) o).collect(Collectors.toList());
+		return planner.explain(operations, getExplainDetails(extended));
+	}
+
+	@Override
+	public String explainSql(String statement, ExplainDetail... extraDetails) {
+		List<Operation> operations = parser.parse(statement);
+
+		if (operations.size() != 1) {
+			throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
+		}
+
+		return planner.explain(operations, extraDetails);
 	}
 
 	@Override
@@ -854,7 +868,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		} else if (operation instanceof ShowViewsOperation) {
 			return buildShowResult(listViews());
 		} else if (operation instanceof ExplainOperation) {
-			String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false);
+			String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()));
 			return TableResultImpl.builder()
 					.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
 					.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
@@ -979,6 +993,19 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		bufferedModifyOperations.addAll(modifyOperations);
 	}
 
+	@VisibleForTesting
+	protected ExplainDetail[] getExplainDetails(boolean extended) {
+		if (extended) {
+			if (isStreamingMode) {
+				return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS };
+			} else {
+				return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST };
+			}
+		} else {
+			return new ExplainDetail[0];
+		}
+	}
+
 	private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
 		validateTableSource(tableSource);
 		ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(name));
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index 5bb9266..d926e3a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
@@ -79,10 +80,10 @@ public interface Planner {
 	 *
 	 * @param operations The collection of relational queries for which the AST
 	 * and execution plan will be returned.
-	 * @param extended if the plan should contain additional properties such as
-	 * e.g. estimated cost, traits
+	 * @param extraDetails The extra explain details which the explain result should include,
+	 *   e.g. estimated cost, change log trait for streaming
 	 */
-	String explain(List<Operation> operations, boolean extended);
+	String explain(List<Operation> operations, ExplainDetail... extraDetails);
 
 	/**
 	 * Returns completion hints for the given statement at the given cursor position.
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index 92f50ff..42b5403 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.utils;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.operations.ModifyOperation;
@@ -42,7 +43,7 @@ public class PlannerMock implements Planner {
 	}
 
 	@Override
-	public String explain(List<Operation> operations, boolean extended) {
+	public String explain(List<Operation> operations, ExplainDetail... extraDetails) {
 		return null;
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 9161753..f97e015 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
@@ -78,7 +78,7 @@ class BatchPlanner(
     }
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): String = {
+  override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
@@ -122,10 +122,10 @@ class BatchPlanner(
 
     sb.append("== Optimized Logical Plan ==")
     sb.append(System.lineSeparator)
-    val explainLevel = if (extended) {
+    val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
       SqlExplainLevel.ALL_ATTRIBUTES
     } else {
-      SqlExplainLevel.DIGEST_ATTRIBUTES
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES
     }
     sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel))
     sb.append(System.lineSeparator)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 7006533..959de06 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
@@ -69,7 +69,7 @@ class StreamPlanner(
     }
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): String = {
+  override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
@@ -109,11 +109,12 @@ class StreamPlanner(
 
     sb.append("== Optimized Logical Plan ==")
     sb.append(System.lineSeparator)
-    val (explainLevel, withChangelogTraits) = if (extended) {
-      (SqlExplainLevel.ALL_ATTRIBUTES, true)
+    val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
+      SqlExplainLevel.ALL_ATTRIBUTES
     } else {
-      (SqlExplainLevel.DIGEST_ATTRIBUTES, false)
+      SqlExplainLevel.DIGEST_ATTRIBUTES
     }
+    val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS)
     sb.append(ExecNodePlanDumper.dagToString(
       execNodes,
       explainLevel,
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
new file mode 100644
index 0000000..870269f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : Source: Custom Source
+
+	 : Operator
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(select=[a, b], where=[(a > 10)])
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : SinkConversionToRow
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: Unnamed
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
new file mode 100644
index 0000000..0c87ae3
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : Source: Custom Source
+
+	 : Operator
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : Calc(select=[a, b, c], where=[(a > 10)])
+			ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index a27b47a..0e197ba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -894,6 +894,63 @@ class TableEnvironmentTest {
     testUnsupportedExplain("explain plan as json for select * from MyTable")
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = tableEnv.explainSql(
+      "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS)
+    val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithInsert.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(explain: String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 4cd0f5d..2d50f43 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1091,7 +1091,7 @@ class TestingTableEnvironment private(
   }
 
   override def explain(extended: Boolean): String = {
-    planner.explain(bufferedOperations.toList, extended)
+    planner.explain(bufferedOperations.toList, getExplainDetails(extended): _*)
   }
 
   @throws[Exception]
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index efc38a5..b3caf20 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -217,16 +217,20 @@ abstract class BatchTableEnvImpl(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-    explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended)
+    explain(
+      JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
+      getExplainDetails(extended): _*)
   }
 
   override def explain(table: Table): String = explain(table: Table, extended = false)
 
   override def explain(extended: Boolean): String = {
-    explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
+    explain(
+      bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava,
+      getExplainDetails(extended): _*)
   }
 
-   protected def explain(operations: JList[Operation], extended: Boolean): String = {
+  protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astList = operations.asScala.map {
       case queryOperation: QueryOperation =>
@@ -285,6 +289,8 @@ abstract class BatchTableEnvImpl(
 
     val env = dataSinks.head.getDataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
+    // keep the behavior as before
+    val extended = extraDetails.contains(ExplainDetail.ESTIMATED_COST)
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
 
     s"== Abstract Syntax Tree ==" +
@@ -597,6 +603,14 @@ abstract class BatchTableEnvImpl(
     TableSchema.builder().fields(originalNames, fieldTypes).build()
   }
 
+  private def getExplainDetails(extended: Boolean): Array[ExplainDetail] = {
+    if (extended) {
+      Array(ExplainDetail.ESTIMATED_COST)
+    } else {
+      Array.empty
+    }
+  }
+
   protected def createDummyBatchTableEnv(): BatchTableEnvImpl
 
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 7c6f144..1f01186 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -762,14 +762,13 @@ abstract class TableEnvImpl(
       case _: ShowViewsOperation =>
         buildShowResult(listViews())
       case explainOperation: ExplainOperation =>
-        val explanation = explain(
-          JCollections.singletonList(explainOperation.getChild),
-          extended = false)
+        val explanation = explain(JCollections.singletonList(explainOperation.getChild))
         TableResultImpl.builder.
           resultKind(ResultKind.SUCCESS_WITH_CONTENT)
           .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
           .data(JCollections.singletonList(Row.of(explanation)))
           .build
+
       case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
     }
   }
@@ -1135,7 +1134,18 @@ abstract class TableEnvImpl(
     }
   }
 
-  protected def explain(operations: JList[Operation], extended: Boolean): String
+  override def explainSql(statement: String, extraDetails: ExplainDetail*): String = {
+    val operations = parser.parse(statement)
+
+    if (operations.size != 1) {
+      throw new TableException(
+        "Unsupported SQL query! explainSql() only accepts a single SQL query.")
+    }
+
+    explain(operations, extraDetails: _*)
+  }
+
+  protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String
 
   override def fromValues(values: Expression*): Table = {
     createTable(operationTreeBuilder.values(values: _*))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 756d9ca..d81ca1c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -120,7 +120,7 @@ class StreamPlanner(
     }.filter(Objects.nonNull).asJava
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): String = {
+  override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astWithUpdatesAsRetractionTuples = operations.asScala.map {
       case queryOperation: QueryOperation =>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index d09bd5d..c928314 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -24,8 +24,7 @@ import org.apache.flink.table.api.{ResultKind, TableException}
 import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
 import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF}
 import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId, _}
 import org.apache.flink.types.Row
 
 import org.hamcrest.Matchers.containsString
@@ -447,6 +446,64 @@ class BatchTableEnvironmentTest extends TableTestBase {
       "explain plan as json for select * from MyTable")
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithSelect1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = util.tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithInsert1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 439fadb..25bb536 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -326,6 +326,64 @@ class StreamTableEnvironmentTest extends TableTestBase {
     }
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithSelect0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = util.tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithInsert0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def prepareSchemaExpressionParser:
     (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 8a9d9c4..312d980 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.utils
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableResult}
+import org.apache.flink.table.api.{ExplainDetail, Table, TableConfig, TableEnvironment, TableResult}
 import org.apache.flink.table.catalog.Catalog
 import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.expressions.Expression
@@ -74,6 +74,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def explain(extended: Boolean): String = ???
 
+  override def explainSql(statement: String, extraDetails: ExplainDetail*): String = ???
+
   override def getCompletionHints(statement: String, position: Int): Array[String] = ???
 
   override def sqlQuery(query: String): Table = ???
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
new file mode 100644
index 0000000..bbd0f53
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  LogicalProject(a=[$0], b=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  DataStreamCalc(select=[a, b], where=[>(a, 10)])
+    StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : from: (a, b)
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : where: (>(a, 10)), select: (a, b)
+			ship_strategy : FORWARD
+
+			 : Operator
+				content : to: Row
+				ship_strategy : FORWARD
+
+				 : Data Sink
+					content : Sink: Unnamed
+					ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
new file mode 100644
index 0000000..b904056
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
@@ -0,0 +1,43 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+  LogicalProject(a=[$0], b=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+  DataSetCalc(select=[a, b], where=[>(a, 10)])
+    BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : Map
+		content : from: (a, b)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		 : FlatMap
+			content : where: (>(a, 10)), select: (a, b)
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			driver_strategy : FlatMap
+			Partitioning : RANDOM_PARTITIONED
+
+			 : Map
+				content : to: Row
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				driver_strategy : Map
+				Partitioning : RANDOM_PARTITIONED
+
+				 : Data Sink
+					content : org.apache.flink.api.java.io.LocalCollectionOutputFormat
+					ship_strategy : Forward
+					exchange_mode : PIPELINED
+					Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+  StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+
+	 : Operator
+		content : Map
+		ship_strategy : FORWARD
+
+		 : Operator
+			content : where: (>(a, 10)), select: (a, b, c)
+			ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+  BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	 : FlatMap
+		content : where: (>(a, 10)), select: (a, b, c)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : FlatMap
+		Partitioning : RANDOM_PARTITIONED
+
+		 : Data Sink
+			content : org.apache.flink.api.java.io.DiscardingOutputFormat
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+