You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/08 12:57:31 UTC
[flink] 04/05: [FLINK-17267] [table] Introduce
TableEnvironment#explainSql api
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a5fc5f28a1c3569da8de953f6cd1fad5371f6a4
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Apr 29 10:57:32 2020 +0800
[FLINK-17267] [table] Introduce TableEnvironment#explainSql api
---
flink-python/pyflink/table/__init__.py | 4 +-
flink-python/pyflink/table/explain_detail.py | 34 ++++++++++++
flink-python/pyflink/table/table_environment.py | 20 ++++++-
.../table/tests/test_table_environment_api.py | 29 +++++++++-
flink-python/pyflink/util/utils.py | 20 +++++++
.../org/apache/flink/table/api/ExplainDetail.java} | 45 +++++-----------
.../apache/flink/table/api/TableEnvironment.java | 15 +++++-
.../table/api/internal/TableEnvironmentImpl.java | 35 +++++++++++--
.../org/apache/flink/table/delegation/Planner.java | 7 +--
.../org/apache/flink/table/utils/PlannerMock.java | 3 +-
.../table/planner/delegation/BatchPlanner.scala | 8 +--
.../table/planner/delegation/StreamPlanner.scala | 11 ++--
.../resources/explain/testExplainSqlWithInsert.out | 31 +++++++++++
.../resources/explain/testExplainSqlWithSelect.out | 21 ++++++++
.../flink/table/api/TableEnvironmentTest.scala | 57 ++++++++++++++++++++
.../flink/table/planner/utils/TableTestBase.scala | 2 +-
.../table/api/internal/BatchTableEnvImpl.scala | 20 +++++--
.../flink/table/api/internal/TableEnvImpl.scala | 18 +++++--
.../apache/flink/table/planner/StreamPlanner.scala | 2 +-
.../api/batch/BatchTableEnvironmentTest.scala | 61 +++++++++++++++++++++-
.../api/stream/StreamTableEnvironmentTest.scala | 58 ++++++++++++++++++++
.../flink/table/utils/MockTableEnvironment.scala | 4 +-
.../scala/resources/testExplainSqlWithInsert0.out | 31 +++++++++++
.../scala/resources/testExplainSqlWithInsert1.out | 43 +++++++++++++++
.../scala/resources/testExplainSqlWithSelect0.out | 21 ++++++++
.../scala/resources/testExplainSqlWithSelect1.out | 27 ++++++++++
26 files changed, 562 insertions(+), 65 deletions(-)
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 140c6b3..1e367f3 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -70,6 +70,7 @@ from pyflink.table.sources import TableSource, CsvTableSource
from pyflink.table.types import DataTypes, UserDefinedType, Row
from pyflink.table.table_schema import TableSchema
from pyflink.table.udf import FunctionContext, ScalarFunction
+from pyflink.table.explain_detail import ExplainDetail
__all__ = [
'TableEnvironment',
@@ -93,5 +94,6 @@ __all__ = [
'TableSchema',
'FunctionContext',
'ScalarFunction',
- 'SqlDialect'
+ 'SqlDialect',
+ 'ExplainDetail'
]
diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py
new file mode 100644
index 0000000..48e7ce9
--- /dev/null
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -0,0 +1,34 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+__all__ = ['ExplainDetail']
+
+
+class ExplainDetail(object):
+ """
+ ExplainDetail defines the types of details for explain result.
+ """
+
+ # The cost information on physical rel node estimated by optimizer.
+ # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network,
+ # 0.0 memory}
+ ESTIMATED_COST = 0
+
+ # The changelog traits produced by a physical rel node.
+ # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+ CHANGELOG_TRAITS = 1
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index d8e8c51..94ff785 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -36,7 +36,8 @@ from pyflink.table import Table
from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
_infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema
from pyflink.util import utils
-from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class
+from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class, \
+ to_j_explain_detail_arr
__all__ = [
'BatchTableEnvironment',
@@ -468,6 +469,23 @@ class TableEnvironment(object):
else:
return self._j_tenv.explain(table._j_table, extended)
+ def explain_sql(self, stmt, *extra_details):
+ """
+ Returns the AST of the specified statement and the execution plan to compute
+ the result of the given statement.
+
+ :param stmt: The statement for which the AST and execution plan will be returned.
+ :type stmt: str
+ :param extra_details: The extra explain details which the explain result should include,
+ e.g. estimated cost, change log trait for streaming
+ :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
+ :return: The statement for which the AST and execution plan will be returned.
+ :rtype: str
+ """
+
+ j_extra_details = to_j_explain_detail_arr(extra_details)
+ return self._j_tenv.explainSql(stmt, j_extra_details)
+
def sql_query(self, query):
"""
Evaluates a SQL query on registered tables and retrieves the result as a
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 87c8023..bd279af 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -35,8 +35,8 @@ from pyflink.table.types import RowType
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase, \
PyFlinkBlinkBatchTableTestCase
-from pyflink.util.exceptions import TableException
from pyflink.util.utils import get_j_env_configuration
+from pyflink.table.explain_detail import ExplainDetail
class TableEnvironmentTest(object):
@@ -242,6 +242,33 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
actual = t_env.explain(extended=True)
assert isinstance(actual, str)
+ def test_explain_sql_without_explain_detail(self):
+ t_env = self.t_env
+ source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+ t_env.register_table_sink(
+ "sinks",
+ source_sink_utils.TestAppendSink(field_names, field_types))
+
+ result = t_env.explain_sql("select a + 1, b, c from %s" % source)
+
+ assert isinstance(result, str)
+
+ def test_explain_sql_with_explain_detail(self):
+ t_env = self.t_env
+ source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+ t_env.register_table_sink(
+ "sinks",
+ source_sink_utils.TestAppendSink(field_names, field_types))
+
+ result = t_env.explain_sql(
+ "select a + 1, b, c from %s" % source, ExplainDetail.ESTIMATED_COST)
+
+ assert isinstance(result, str)
+
def test_create_table_environment(self):
table_config = TableConfig()
table_config.set_max_generated_code_length(32000)
diff --git a/flink-python/pyflink/util/utils.py b/flink-python/pyflink/util/utils.py
index 89db742..29a20da 100644
--- a/flink-python/pyflink/util/utils.py
+++ b/flink-python/pyflink/util/utils.py
@@ -125,3 +125,23 @@ def add_jars_to_context_class_loader(jar_urls):
new_classloader = gateway.jvm.java.net.URLClassLoader(
to_jarray(gateway.jvm.java.net.URL, j_urls), context_classloader)
gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
+
+
+def to_j_explain_detail_arr(p_extra_details):
+ # sphinx will check "import loop" when generating doc,
+ # use local import to avoid above error
+ from pyflink.table.explain_detail import ExplainDetail
+ gateway = get_gateway()
+
+ def to_j_explain_detail(p_extra_detail):
+ if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS:
+ return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS
+ else:
+ return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
+
+ _len = len(p_extra_details) if p_extra_details else 0
+ j_arr = gateway.new_array(gateway.jvm.org.apache.flink.table.api.ExplainDetail, _len)
+ for i in range(0, _len):
+ j_arr[i] = to_j_explain_detail(p_extra_details[i])
+
+ return j_arr
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
similarity index 50%
copy from flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 92f50ff..6e9d014 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -16,38 +16,21 @@
* limitations under the License.
*/
-package org.apache.flink.table.utils;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.operations.ModifyOperation;
-import org.apache.flink.table.operations.Operation;
-
-import java.util.List;
+package org.apache.flink.table.api;
/**
- * Mocking {@link Planner} for tests.
+ * ExplainDetail defines the types of details for explain result.
*/
-public class PlannerMock implements Planner {
-
- @Override
- public Parser getParser() {
- return new ParserMock();
- }
-
- @Override
- public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
- return null;
- }
-
- @Override
- public String explain(List<Operation> operations, boolean extended) {
- return null;
- }
-
- @Override
- public String[] getCompletionHints(String statement, int position) {
- return new String[0];
- }
+public enum ExplainDetail {
+ /**
+ * The cost information on physical rel node estimated by optimizer.
+ * e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
+ */
+ ESTIMATED_COST,
+
+ /**
+ * The changelog traits produced by a physical rel node.
+ * e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+ */
+ CHANGELOG_TRAITS
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index d855b21..12d21ec 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -828,7 +828,7 @@ public interface TableEnvironment {
* the result of the given {@link Table}.
*
* @param table The table for which the AST and execution plan will be returned.
- * @param extended if the plan should contain additional properties such as
+ * @param extended if the plan should contain additional properties,
* e.g. estimated cost, traits
*/
String explain(Table table, boolean extended);
@@ -837,12 +837,23 @@ public interface TableEnvironment {
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of multiple-sinks plan.
*
- * @param extended if the plan should contain additional properties such as
+ * @param extended if the plan should contain additional properties,
* e.g. estimated cost, traits
*/
String explain(boolean extended);
/**
+ * Returns the AST of the specified statement and the execution plan to compute
+ * the result of the given statement.
+ *
+ * @param statement The statement for which the AST and execution plan will be returned.
+ * @param extraDetails The extra explain details which the explain result should include,
+ * e.g. estimated cost, change log trait for streaming
+ * @return AST and the execution plan.
+ */
+ String explainSql(String statement, ExplainDetail... extraDetails);
+
+ /**
* Returns completion hints for the given statement at the given cursor position.
* The completion happens case insensitively.
*
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1ca045b..610627c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.Table;
@@ -136,6 +137,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected final FunctionCatalog functionCatalog;
protected final Planner planner;
protected final Parser parser;
+ private final boolean isStreamingMode;
private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
@@ -179,6 +181,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
this.functionCatalog = functionCatalog;
this.planner = planner;
this.parser = planner.getParser();
+ this.isStreamingMode = isStreamingMode;
this.operationTreeBuilder = OperationTreeBuilder.create(
tableConfig,
functionCatalog.asLookup(parser::parseIdentifier),
@@ -589,14 +592,25 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
@Override
public String explain(Table table, boolean extended) {
- return planner.explain(Collections.singletonList(table.getQueryOperation()), extended);
+ return planner.explain(Collections.singletonList(table.getQueryOperation()), getExplainDetails(extended));
}
@Override
public String explain(boolean extended) {
List<Operation> operations = bufferedModifyOperations.stream()
- .map(o -> (Operation) o).collect(Collectors.toList());
- return planner.explain(operations, extended);
+ .map(o -> (Operation) o).collect(Collectors.toList());
+ return planner.explain(operations, getExplainDetails(extended));
+ }
+
+ @Override
+ public String explainSql(String statement, ExplainDetail... extraDetails) {
+ List<Operation> operations = parser.parse(statement);
+
+ if (operations.size() != 1) {
+ throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
+ }
+
+ return planner.explain(operations, extraDetails);
}
@Override
@@ -854,7 +868,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
} else if (operation instanceof ShowViewsOperation) {
return buildShowResult(listViews());
} else if (operation instanceof ExplainOperation) {
- String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false);
+ String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()));
return TableResultImpl.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
@@ -979,6 +993,19 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
bufferedModifyOperations.addAll(modifyOperations);
}
+ @VisibleForTesting
+ protected ExplainDetail[] getExplainDetails(boolean extended) {
+ if (extended) {
+ if (isStreamingMode) {
+ return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS };
+ } else {
+ return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST };
+ }
+ } else {
+ return new ExplainDetail[0];
+ }
+ }
+
private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
validateTableSource(tableSource);
ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(name));
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index 5bb9266..d926e3a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.delegation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
@@ -79,10 +80,10 @@ public interface Planner {
*
* @param operations The collection of relational queries for which the AST
* and execution plan will be returned.
- * @param extended if the plan should contain additional properties such as
- * e.g. estimated cost, traits
+ * @param extraDetails The extra explain details which the explain result should include,
+ * e.g. estimated cost, change log trait for streaming
*/
- String explain(List<Operation> operations, boolean extended);
+ String explain(List<Operation> operations, ExplainDetail... extraDetails);
/**
* Returns completion hints for the given statement at the given cursor position.
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index 92f50ff..42b5403 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.utils;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.operations.ModifyOperation;
@@ -42,7 +43,7 @@ public class PlannerMock implements Planner {
}
@Override
- public String explain(List<Operation> operations, boolean extended) {
+ public String explain(List<Operation> operations, ExplainDetail... extraDetails) {
return null;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 9161753..f97e015 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.delegation
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.delegation.Executor
import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
@@ -78,7 +78,7 @@ class BatchPlanner(
}
}
- override def explain(operations: util.List[Operation], extended: Boolean): String = {
+ override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
require(operations.nonEmpty, "operations should not be empty")
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
@@ -122,10 +122,10 @@ class BatchPlanner(
sb.append("== Optimized Logical Plan ==")
sb.append(System.lineSeparator)
- val explainLevel = if (extended) {
+ val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
SqlExplainLevel.ALL_ATTRIBUTES
} else {
- SqlExplainLevel.DIGEST_ATTRIBUTES
+ SqlExplainLevel.EXPPLAN_ATTRIBUTES
}
sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel))
sb.append(System.lineSeparator)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 7006533..959de06 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.delegation
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.delegation.Executor
import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
@@ -69,7 +69,7 @@ class StreamPlanner(
}
}
- override def explain(operations: util.List[Operation], extended: Boolean): String = {
+ override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
require(operations.nonEmpty, "operations should not be empty")
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
@@ -109,11 +109,12 @@ class StreamPlanner(
sb.append("== Optimized Logical Plan ==")
sb.append(System.lineSeparator)
- val (explainLevel, withChangelogTraits) = if (extended) {
- (SqlExplainLevel.ALL_ATTRIBUTES, true)
+ val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
+ SqlExplainLevel.ALL_ATTRIBUTES
} else {
- (SqlExplainLevel.DIGEST_ATTRIBUTES, false)
+ SqlExplainLevel.DIGEST_ATTRIBUTES
}
+ val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS)
sb.append(ExecNodePlanDumper.dagToString(
execNodes,
explainLevel,
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
new file mode 100644
index 0000000..870269f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : Source: Custom Source
+
+ : Operator
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select=[a, b], where=[(a > 10)])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: Unnamed
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
new file mode 100644
index 0000000..0c87ae3
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : Source: Custom Source
+
+ : Operator
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select=[a, b, c], where=[(a > 10)])
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index a27b47a..0e197ba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -894,6 +894,63 @@ class TableEnvironmentTest {
testUnsupportedExplain("explain plan as json for select * from MyTable")
}
+ @Test
+ def testExplainSqlWithSelect(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = tableEnv.explainSql(
+ "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS)
+ val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
+ @Test
+ def testExplainSqlWithInsert(): Unit = {
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val actual = tableEnv.explainSql(
+ "insert into MySink select a, b from MyTable where a > 10")
+ val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithInsert.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def testUnsupportedExplain(explain: String): Unit = {
try {
tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 4cd0f5d..2d50f43 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1091,7 +1091,7 @@ class TestingTableEnvironment private(
}
override def explain(extended: Boolean): String = {
- planner.explain(bufferedOperations.toList, extended)
+ planner.explain(bufferedOperations.toList, getExplainDetails(extended): _*)
}
@throws[Exception]
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index efc38a5..b3caf20 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -217,16 +217,20 @@ abstract class BatchTableEnvImpl(
* @param extended Flag to include detailed optimizer estimates.
*/
private[flink] def explain(table: Table, extended: Boolean): String = {
- explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended)
+ explain(
+ JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
+ getExplainDetails(extended): _*)
}
override def explain(table: Table): String = explain(table: Table, extended = false)
override def explain(extended: Boolean): String = {
- explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
+ explain(
+ bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava,
+ getExplainDetails(extended): _*)
}
- protected def explain(operations: JList[Operation], extended: Boolean): String = {
+ protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String = {
require(operations.asScala.nonEmpty, "operations should not be empty")
val astList = operations.asScala.map {
case queryOperation: QueryOperation =>
@@ -285,6 +289,8 @@ abstract class BatchTableEnvImpl(
val env = dataSinks.head.getDataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
+ // keep the behavior as before
+ val extended = extraDetails.contains(ExplainDetail.ESTIMATED_COST)
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
s"== Abstract Syntax Tree ==" +
@@ -597,6 +603,14 @@ abstract class BatchTableEnvImpl(
TableSchema.builder().fields(originalNames, fieldTypes).build()
}
+ private def getExplainDetails(extended: Boolean): Array[ExplainDetail] = {
+ if (extended) {
+ Array(ExplainDetail.ESTIMATED_COST)
+ } else {
+ Array.empty
+ }
+ }
+
protected def createDummyBatchTableEnv(): BatchTableEnvImpl
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 7c6f144..1f01186 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -762,14 +762,13 @@ abstract class TableEnvImpl(
case _: ShowViewsOperation =>
buildShowResult(listViews())
case explainOperation: ExplainOperation =>
- val explanation = explain(
- JCollections.singletonList(explainOperation.getChild),
- extended = false)
+ val explanation = explain(JCollections.singletonList(explainOperation.getChild))
TableResultImpl.builder.
resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
.data(JCollections.singletonList(Row.of(explanation)))
.build
+
case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
}
}
@@ -1135,7 +1134,18 @@ abstract class TableEnvImpl(
}
}
- protected def explain(operations: JList[Operation], extended: Boolean): String
+ override def explainSql(statement: String, extraDetails: ExplainDetail*): String = {
+ val operations = parser.parse(statement)
+
+ if (operations.size != 1) {
+ throw new TableException(
+ "Unsupported SQL query! explainSql() only accepts a single SQL query.")
+ }
+
+ explain(operations, extraDetails: _*)
+ }
+
+ protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String
override def fromValues(values: Expression*): Table = {
createTable(operationTreeBuilder.values(values: _*))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 756d9ca..d81ca1c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -120,7 +120,7 @@ class StreamPlanner(
}.filter(Objects.nonNull).asJava
}
- override def explain(operations: util.List[Operation], extended: Boolean): String = {
+ override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
require(operations.asScala.nonEmpty, "operations should not be empty")
val astWithUpdatesAsRetractionTuples = operations.asScala.map {
case queryOperation: QueryOperation =>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index d09bd5d..c928314 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -24,8 +24,7 @@ import org.apache.flink.table.api.{ResultKind, TableException}
import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF}
import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId, _}
import org.apache.flink.types.Row
import org.hamcrest.Matchers.containsString
@@ -447,6 +446,64 @@ class BatchTableEnvironmentTest extends TableTestBase {
"explain plan as json for select * from MyTable")
}
+ @Test
+ def testExplainSqlWithSelect(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+ val expected = readFromResource("testExplainSqlWithSelect1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
+ @Test
+ def testExplainSqlWithInsert(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val actual = util.tableEnv.explainSql(
+ "insert into MySink select a, b from MyTable where a > 10")
+ val expected = readFromResource("testExplainSqlWithInsert1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
try {
tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 439fadb..25bb536 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -326,6 +326,64 @@ class StreamTableEnvironmentTest extends TableTestBase {
}
}
+ @Test
+ def testExplainSqlWithSelect(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+ val expected = readFromResource("testExplainSqlWithSelect0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
+ @Test
+ def testExplainSqlWithInsert(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val actual = util.tableEnv.explainSql(
+ "insert into MySink select a, b from MyTable where a > 10")
+ val expected = readFromResource("testExplainSqlWithInsert0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def prepareSchemaExpressionParser:
(JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 8a9d9c4..312d980 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.utils
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableResult}
+import org.apache.flink.table.api.{ExplainDetail, Table, TableConfig, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.Catalog
import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.expressions.Expression
@@ -74,6 +74,8 @@ class MockTableEnvironment extends TableEnvironment {
override def explain(extended: Boolean): String = ???
+ override def explainSql(statement: String, extraDetails: ExplainDetail*): String = ???
+
override def getCompletionHints(statement: String, position: Int): Array[String] = ???
override def sqlQuery(query: String): Table = ???
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
new file mode 100644
index 0000000..bbd0f53
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+ LogicalProject(a=[$0], b=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+ DataStreamCalc(select=[a, b], where=[>(a, 10)])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : from: (a, b)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : where: (>(a, 10)), select: (a, b)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : to: Row
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: Unnamed
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
new file mode 100644
index 0000000..b904056
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
@@ -0,0 +1,43 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+ LogicalProject(a=[$0], b=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+ DataSetCalc(select=[a, b], where=[>(a, 10)])
+ BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : from: (a, b)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : FlatMap
+ content : where: (>(a, 10)), select: (a, b)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : FlatMap
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : to: Row
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : org.apache.flink.api.java.io.LocalCollectionOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : Map
+ ship_strategy : FORWARD
+
+ : Operator
+ content : where: (>(a, 10)), select: (a, b, c)
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+ BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : FlatMap
+ content : where: (>(a, 10)), select: (a, b, c)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : FlatMap
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : org.apache.flink.api.java.io.DiscardingOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+