You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/08 12:57:27 UTC
[flink] branch master updated (0d1738b -> 4959ebf)
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 0d1738b [FLINK-17256][python] Support keyword arguments in the PyFlink descriptor API
new 59ca355 [FLINK-17267] [table-planner] legacy stream planner supports explain insert operation
new 2fc82ad [FLINK-17267] [table-planner] legacy batch planner supports explain insert operation
new 6013a62 [FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement
new 1a5fc5f [FLINK-17267] [table] Introduce TableEnvironment#explainSql api
new 4959ebf [FLINK-17267] [table] Introduce Table#explain api
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
flink-python/pyflink/table/__init__.py | 4 +-
flink-python/pyflink/table/explain_detail.py | 34 +++
flink-python/pyflink/table/table.py | 14 ++
flink-python/pyflink/table/table_environment.py | 19 +-
flink-python/pyflink/table/tests/test_explain.py | 40 +++
.../table/tests/test_table_environment_api.py | 34 ++-
flink-python/pyflink/util/utils.py | 20 ++
.../org/apache/flink/table/api/ExplainDetail.java} | 45 ++--
.../java/org/apache/flink/table/api/Table.java | 10 +
.../apache/flink/table/api/TableEnvironment.java | 15 +-
.../table/api/internal/TableEnvironmentImpl.java | 48 +++-
.../api/internal/TableEnvironmentInternal.java | 14 ++
.../apache/flink/table/api/internal/TableImpl.java | 6 +
.../flink/table/api/internal/TableResultImpl.java | 51 +++-
.../org/apache/flink/table/delegation/Planner.java | 7 +-
.../flink/table/operations/ExplainOperation.java} | 39 ++-
.../org/apache/flink/table/utils/PlannerMock.java | 3 +-
.../operations/SqlToOperationConverter.java | 19 ++
.../table/planner/calcite/FlinkPlannerImpl.scala | 11 +-
.../table/planner/delegation/BatchPlanner.scala | 30 ++-
.../table/planner/delegation/StreamPlanner.scala | 33 ++-
.../explain/testExecuteSqlWithExplainInsert.out | 31 +++
.../explain/testExecuteSqlWithExplainSelect.out | 21 ++
.../resources/explain/testExplainSqlWithInsert.out | 31 +++
.../resources/explain/testExplainSqlWithSelect.out | 21 ++
.../flink/table/api/TableEnvironmentTest.scala | 194 +++++++++++++-
.../flink/table/planner/utils/TableTestBase.scala | 2 +-
.../table/sqlexec/SqlToOperationConverter.java | 19 ++
.../table/api/internal/BatchTableEnvImpl.scala | 230 ++++++++++++++---
.../flink/table/api/internal/TableEnvImpl.scala | 88 ++++++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 11 +-
.../table/calcite/RelTimeIndicatorConverter.scala | 30 +++
.../flink/table/plan/nodes/LogicalSink.scala | 55 ++++
.../org/apache/flink/table/plan/nodes/Sink.scala | 57 +++++
.../table/plan/nodes/dataset/DataSetSink.scala | 57 +++++
.../plan/nodes/datastream/DataStreamSink.scala | 204 +++++++++++++++
.../plan/nodes/logical/FlinkLogicalSink.scala | 73 ++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 9 +-
.../table/plan/rules/dataSet/DataSetSinkRule.scala | 55 ++++
.../plan/rules/datastream/DataStreamSinkRule.scala | 53 ++++
.../apache/flink/table/planner/StreamPlanner.scala | 280 +++++++--------------
.../flink/table/sinks/DataStreamTableSink.scala | 58 +++++
.../flink/table/api/TableEnvironmentITCase.scala | 17 +-
.../api/batch/BatchTableEnvironmentTest.scala | 201 ++++++++++++++-
.../apache/flink/table/api/batch/ExplainTest.scala | 71 ++++--
.../sql/validation/InsertIntoValidationTest.scala | 8 +
.../validation/InsertIntoValidationTest.scala | 4 +
.../flink/table/api/stream/ExplainTest.scala | 75 ++++--
.../api/stream/StreamTableEnvironmentTest.scala | 206 ++++++++++++++-
.../flink/table/utils/MockTableEnvironment.scala | 4 +-
.../apache/flink/table/utils/TableTestBase.scala | 10 +
.../resources/testExecuteSqlWithExplainInsert0.out | 31 +++
.../resources/testExecuteSqlWithExplainInsert1.out | 36 +++
.../resources/testExecuteSqlWithExplainSelect0.out | 21 ++
.../resources/testExecuteSqlWithExplainSelect1.out | 27 ++
.../scala/resources/testExplainSqlWithInsert0.out | 31 +++
.../scala/resources/testExplainSqlWithInsert1.out | 43 ++++
.../scala/resources/testExplainSqlWithSelect0.out | 21 ++
.../scala/resources/testExplainSqlWithSelect1.out | 27 ++
.../resources/testFromToDataStreamAndSqlUpdate.out | 23 +-
.../src/test/scala/resources/testInsert.out | 29 +++
.../src/test/scala/resources/testInsert1.out | 27 ++
.../test/scala/resources/testMultipleInserts.out | 55 ++++
.../test/scala/resources/testMultipleInserts1.out | 51 ++++
.../resources/testSqlUpdateAndToDataStream.out | 20 +-
65 files changed, 2709 insertions(+), 404 deletions(-)
create mode 100644 flink-python/pyflink/table/explain_detail.py
create mode 100644 flink-python/pyflink/table/tests/test_explain.py
copy flink-table/flink-table-api-java/src/{test/java/org/apache/flink/table/utils/PlannerMock.java => main/java/org/apache/flink/table/api/ExplainDetail.java} (50%)
copy flink-table/flink-table-api-java/src/{test/java/org/apache/flink/table/utils/PlannerMock.java => main/java/org/apache/flink/table/operations/ExplainOperation.java} (52%)
create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/LogicalSink.scala
create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/Sink.scala
create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSink.scala
create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSinkRule.scala
create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testInsert.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts.out
create mode 100644 flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
[flink] 03/05: [FLINK-17267] [table] TableEnvironment#explainSql
supports EXPLAIN statement
Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6013a62523bd2fc73b6aa7b3109f6b7c138ac51c
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Apr 22 17:08:00 2020 +0800
[FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement
---
.../table/api/internal/TableEnvironmentImpl.java | 9 ++
.../flink/table/operations/ExplainOperation.java | 46 ++++++++
.../operations/SqlToOperationConverter.java | 19 ++++
.../table/planner/calcite/FlinkPlannerImpl.scala | 11 +-
.../table/planner/delegation/BatchPlanner.scala | 22 +++-
.../table/planner/delegation/StreamPlanner.scala | 22 +++-
.../explain/testExecuteSqlWithExplainInsert.out | 31 +++++
.../explain/testExecuteSqlWithExplainSelect.out | 21 ++++
.../flink/table/api/TableEnvironmentTest.scala | 115 ++++++++++++++++++-
.../table/sqlexec/SqlToOperationConverter.java | 19 ++++
.../table/api/internal/BatchTableEnvImpl.scala | 71 ++++++++----
.../flink/table/api/internal/TableEnvImpl.scala | 13 ++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 11 +-
.../apache/flink/table/planner/StreamPlanner.scala | 17 ++-
.../flink/table/api/TableEnvironmentITCase.scala | 13 +--
.../api/batch/BatchTableEnvironmentTest.scala | 120 +++++++++++++++++++-
.../api/stream/StreamTableEnvironmentTest.scala | 126 ++++++++++++++++++++-
.../resources/testExecuteSqlWithExplainInsert0.out | 31 +++++
.../resources/testExecuteSqlWithExplainInsert1.out | 36 ++++++
.../resources/testExecuteSqlWithExplainSelect0.out | 21 ++++
.../resources/testExecuteSqlWithExplainSelect1.out | 27 +++++
21 files changed, 744 insertions(+), 57 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 342ee55..1ca045b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -72,6 +72,7 @@ import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogQueryOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
@@ -852,6 +853,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
return buildShowResult(listFunctions());
} else if (operation instanceof ShowViewsOperation) {
return buildShowResult(listViews());
+ } else if (operation instanceof ExplainOperation) {
+ String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false);
+ return TableResultImpl.builder()
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
+ .data(Collections.singletonList(Row.of(explanation)))
+ .build();
+
} else {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
new file mode 100644
index 0000000..c78c78f
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import java.util.Collections;
+
+/**
+ * Operation to describe an EXPLAIN statement.
+ * NOTES: currently, only default behavior(EXPLAIN PLAN FOR xx) is supported.
+ */
+public class ExplainOperation implements Operation {
+ private final Operation child;
+
+ public ExplainOperation(Operation child) {
+ this.child = child;
+ }
+
+ public Operation getChild() {
+ return child;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return OperationUtils.formatWithChildren(
+ "EXPLAIN PLAN FOR",
+ Collections.emptyMap(),
+ Collections.singletonList(child),
+ Operation::asSummaryString);
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index b303570..9f80aa1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -62,6 +62,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
import org.apache.flink.table.operations.ShowDatabasesOperation;
@@ -98,6 +99,9 @@ import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
@@ -195,6 +199,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertDropView((SqlDropView) validated));
} else if (validated instanceof SqlShowViews) {
return Optional.of(converter.convertShowViews((SqlShowViews) validated));
+ } else if (validated instanceof SqlExplain) {
+ return Optional.of(converter.convertExplain((SqlExplain) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
@@ -577,6 +583,19 @@ public class SqlToOperationConverter {
return new ShowViewsOperation();
}
+ /** Convert EXPLAIN statement. */
+ private Operation convertExplain(SqlExplain sqlExplain) {
+ Operation operation = convertSqlQuery(sqlExplain.getExplicandum());
+
+ if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES ||
+ sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL ||
+ sqlExplain.getFormat() != SqlExplainFormat.TEXT) {
+ throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx");
+ }
+
+ return new ExplainOperation(operation);
+ }
+
/** Fallback method for sql query. */
private Operation convertSqlQuery(SqlNode node) {
return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index c3f72b3..846f536 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -30,7 +30,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
-import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
import java.lang.{Boolean => JBoolean}
@@ -129,7 +129,14 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowViews]) {
return sqlNode
}
- validator.validate(sqlNode)
+ sqlNode match {
+ case explain: SqlExplain =>
+ val validated = validator.validate(explain.getExplicandum)
+ explain.setOperand(0, validated)
+ explain
+ case _ =>
+ validator.validate(sqlNode)
+ }
}
catch {
case e: RuntimeException =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 2626809..9161753 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.planner.operations.PlannerQueryOperation
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
@@ -32,6 +33,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp
import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
+import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
import org.apache.calcite.sql.SqlExplainLevel
@@ -80,7 +82,21 @@ class BatchPlanner(
require(operations.nonEmpty, "operations should not be empty")
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
- getRelBuilder.queryOperation(queryOperation).build()
+ val relNode = getRelBuilder.queryOperation(queryOperation).build()
+ relNode match {
+ // SQL: explain plan for insert into xx
+ case modify: LogicalTableModify =>
+ // convert LogicalTableModify to CatalogSinkModifyOperation
+ val qualifiedName = modify.getTable.getQualifiedName
+ require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+ val modifyOperation = new CatalogSinkModifyOperation(
+ ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+ new PlannerQueryOperation(modify.getInput)
+ )
+ translateToRel(modifyOperation)
+ case _ =>
+ relNode
+ }
case modifyOperation: ModifyOperation =>
translateToRel(modifyOperation)
case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index e6245f2..7006533 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.planner.operations.PlannerQueryOperation
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer}
@@ -30,6 +31,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp
import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
+import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.sql.SqlExplainLevel
import java.util
@@ -71,7 +73,21 @@ class StreamPlanner(
require(operations.nonEmpty, "operations should not be empty")
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
- getRelBuilder.queryOperation(queryOperation).build()
+ val relNode = getRelBuilder.queryOperation(queryOperation).build()
+ relNode match {
+ // SQL: explain plan for insert into xx
+ case modify: LogicalTableModify =>
+ // convert LogicalTableModify to CatalogSinkModifyOperation
+ val qualifiedName = modify.getTable.getQualifiedName
+ require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+ val modifyOperation = new CatalogSinkModifyOperation(
+ ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+ new PlannerQueryOperation(modify.getInput)
+ )
+ translateToRel(modifyOperation)
+ case _ =>
+ relNode
+ }
case modifyOperation: ModifyOperation =>
translateToRel(modifyOperation)
case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
new file mode 100644
index 0000000..0e5e015
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : Source: Custom Source
+
+ : Operator
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select=[a, b], where=[(a > 10)])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: Unnamed
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
new file mode 100644
index 0000000..4865193
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : Source: Custom Source
+
+ : Operator
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select=[a, b, c], where=[(a > 10)])
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index e289edb..a27b47a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -18,8 +18,6 @@
package org.apache.flink.table.api
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.sql.SqlExplainLevel
import org.apache.flink.api.common.typeinfo.Types.STRING
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
@@ -29,10 +27,14 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.planner.operations.SqlConversionException
import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF
import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction
+import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks}
import org.apache.flink.types.Row
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.sql.SqlExplainLevel
import org.hamcrest.Matchers.containsString
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.rules.ExpectedException
import org.junit.{Rule, Test}
@@ -797,6 +799,113 @@ class TableEnvironmentTest {
tableResult5.collect())
}
+ @Test
+ def testExecuteSqlWithExplainSelect(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val tableResult2 = tableEnv.executeSql("explain plan for select * from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ val it = tableResult2.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithExplainInsert(): Unit = {
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val tableResult3 = tableEnv.executeSql(
+ "explain plan for insert into MySink select a, b from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+ val it = tableResult3.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithUnsupportedExplain(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ // TODO we can support them later
+ testUnsupportedExplain("explain plan excluding attributes for select * from MyTable")
+ testUnsupportedExplain("explain plan including all attributes for select * from MyTable")
+ testUnsupportedExplain("explain plan with type for select * from MyTable")
+ testUnsupportedExplain("explain plan without implementation for select * from MyTable")
+ testUnsupportedExplain("explain plan as xml for select * from MyTable")
+ testUnsupportedExplain("explain plan as json for select * from MyTable")
+ }
+
+ private def testUnsupportedExplain(explain: String): Unit = {
+ try {
+ tableEnv.executeSql(explain)
+ fail("This should not happen")
+ } catch {
+ case e: TableException =>
+ assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+ case e =>
+ fail("This should not happen, " + e.getMessage)
+ }
+ }
+
private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
while (expected.hasNext && actual.hasNext) {
assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 8ebb47e..ded0d85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -61,6 +61,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.PlannerQueryOperation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -91,6 +92,9 @@ import org.apache.flink.util.StringUtils;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
@@ -187,6 +191,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertDropView((SqlDropView) validated));
} else if (validated instanceof SqlShowViews) {
return Optional.of(converter.convertShowViews((SqlShowViews) validated));
+ } else if (validated instanceof SqlExplain) {
+ return Optional.of(converter.convertExplain((SqlExplain) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
@@ -534,6 +540,19 @@ public class SqlToOperationConverter {
return new ShowViewsOperation();
}
+ /** Convert EXPLAIN statement. */
+ private Operation convertExplain(SqlExplain sqlExplain) {
+ Operation operation = convertSqlQuery(sqlExplain.getExplicandum());
+
+ if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES ||
+ sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL ||
+ sqlExplain.getFormat() != SqlExplainFormat.TEXT) {
+ throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx");
+ }
+
+ return new ExplainOperation(operation);
+ }
+
/**
* Create a table schema from {@link SqlCreateTable}. This schema contains computed column
* fields, say, we have a create table DDL statement:
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index e25b8e4..efc38a5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -31,14 +31,14 @@ import org.apache.flink.configuration.DeploymentOptions
import org.apache.flink.core.execution.{DetachedJobExecutionResult, JobClient}
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory}
-import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager}
+import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager, ObjectIdentifier}
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, PlannerQueryOperation, QueryOperation}
import org.apache.flink.table.plan.BatchOptimizer
import org.apache.flink.table.plan.nodes.LogicalSink
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
@@ -57,6 +57,7 @@ import org.apache.flink.util.Preconditions.checkNotNull
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalTableModify
import _root_.java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}
@@ -225,11 +226,25 @@ abstract class BatchTableEnvImpl(
explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
}
- private def explain(operations: JList[Operation], extended: Boolean): String = {
+ protected def explain(operations: JList[Operation], extended: Boolean): String = {
require(operations.asScala.nonEmpty, "operations should not be empty")
val astList = operations.asScala.map {
case queryOperation: QueryOperation =>
- getRelBuilder.tableOperation(queryOperation).build()
+ val relNode = getRelBuilder.tableOperation(queryOperation).build()
+ relNode match {
+ // SQL: explain plan for insert into xx
+ case modify: LogicalTableModify =>
+ // convert LogicalTableModify to CatalogSinkModifyOperation
+ val qualifiedName = modify.getTable.getQualifiedName
+ require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+ val modifyOperation = new CatalogSinkModifyOperation(
+ ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+ new PlannerQueryOperation(modify.getInput)
+ )
+ translateToRel(modifyOperation, addLogicalSink = true)
+ case _ =>
+ relNode
+ }
case modifyOperation: ModifyOperation =>
translateToRel(modifyOperation, addLogicalSink = true)
case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}")
@@ -237,27 +252,33 @@ abstract class BatchTableEnvImpl(
val optimizedNodes = astList.map(optimizer.optimize)
- val batchTableEnv = createDummyBatchTableEnv()
- val dataSinks = optimizedNodes.zip(operations.asScala).map {
- case (optimizedNode, operation) =>
- operation match {
- case queryOperation: QueryOperation =>
- val dataSet = translate[Row](
- optimizedNode,
- getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))(
- new GenericTypeInfo(classOf[Row]))
- dataSet.output(new DiscardingOutputFormat[Row])
- case modifyOperation: ModifyOperation =>
- val tableSink = getTableSink(modifyOperation)
- translate(
- batchTableEnv,
- optimizedNode,
- tableSink,
- getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
- case o =>
- throw new TableException("Unsupported Operation: " + o.asSummaryString())
- }
- }
+ val batchTableEnv = createDummyBatchTableEnv()
+ val dataSinks = optimizedNodes.zip(operations.asScala).map {
+ case (optimizedNode, operation) =>
+ operation match {
+ case queryOperation: QueryOperation =>
+ val fieldNames = queryOperation match {
+ case o: PlannerQueryOperation if o.getCalciteTree.isInstanceOf[LogicalTableModify] =>
+ o.getCalciteTree.getInput(0).getRowType.getFieldNames.asScala.toArray[String]
+ case _ =>
+ queryOperation.getTableSchema.getFieldNames
+ }
+ val dataSet = translate[Row](
+ optimizedNode,
+ getTableSchema(fieldNames, optimizedNode))(
+ new GenericTypeInfo(classOf[Row]))
+ dataSet.output(new DiscardingOutputFormat[Row])
+ case modifyOperation: ModifyOperation =>
+ val tableSink = getTableSink(modifyOperation)
+ translate(
+ batchTableEnv,
+ optimizedNode,
+ tableSink,
+ getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+ case o =>
+ throw new TableException("Unsupported Operation: " + o.asSummaryString())
+ }
+ }
val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator)
val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3a97106..7c6f144 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -545,8 +545,6 @@ abstract class TableEnvImpl(
override def listFunctions(): Array[String] = functionCatalog.getFunctions
- override def explain(table: Table): String
-
override def getCompletionHints(statement: String, position: Int): Array[String] = {
val planner = getFlinkPlanner
planner.getCompletionHints(statement, position)
@@ -763,6 +761,15 @@ abstract class TableEnvImpl(
TableResultImpl.TABLE_RESULT_OK
case _: ShowViewsOperation =>
buildShowResult(listViews())
+ case explainOperation: ExplainOperation =>
+ val explanation = explain(
+ JCollections.singletonList(explainOperation.getChild),
+ extended = false)
+ TableResultImpl.builder.
+ resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
+ .data(JCollections.singletonList(Row.of(explanation)))
+ .build
case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
}
}
@@ -1128,6 +1135,8 @@ abstract class TableEnvImpl(
}
}
+ protected def explain(operations: JList[Operation], extended: Boolean): String
+
override def fromValues(values: Expression*): Table = {
createTable(operationTreeBuilder.values(values: _*))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index b4c6210..3d7dae4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.rel.RelRoot
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
-import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
import _root_.java.lang.{Boolean => JBoolean}
@@ -127,7 +127,14 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowViews]) {
return sqlNode
}
- validator.validate(sqlNode)
+ sqlNode match {
+ case explain: SqlExplain =>
+ val validated = validator.validate(explain.getExplicandum)
+ explain.setOperand(0, validated)
+ explain
+ case _ =>
+ validator.validate(sqlNode)
+ }
}
catch {
case e: RuntimeException =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index f549168..756d9ca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -43,6 +43,7 @@ import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalTableModify
import _root_.java.util
import _root_.java.util.Objects
@@ -123,7 +124,21 @@ class StreamPlanner(
require(operations.asScala.nonEmpty, "operations should not be empty")
val astWithUpdatesAsRetractionTuples = operations.asScala.map {
case queryOperation: QueryOperation =>
- (getRelBuilder.tableOperation(queryOperation).build(), false)
+ val relNode = getRelBuilder.tableOperation(queryOperation).build()
+ relNode match {
+ // SQL: explain plan for insert into xx
+ case modify: LogicalTableModify =>
+ // convert LogicalTableModify to CatalogSinkModifyOperation
+ val qualifiedName = modify.getTable.getQualifiedName
+ require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+ val modifyOperation = new CatalogSinkModifyOperation(
+ ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+ new PlannerQueryOperation(modify.getInput)
+ )
+ translateToRel(modifyOperation)
+ case _ =>
+ (relNode, false)
+ }
case modifyOperation: ModifyOperation =>
translateToRel(modifyOperation)
case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index c5e90bf..dd83a3e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -24,13 +24,14 @@ import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecutionEnvironment}
-import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData, readFromResource, replaceStageId}
+import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData}
import org.apache.flink.table.api.internal.TableEnvironmentImpl
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnvironment, _}
import org.apache.flink.table.runtime.utils.StreamITCase
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
import org.apache.flink.table.utils.TestingOverwritableTableSink
import org.apache.flink.types.Row
import org.apache.flink.util.FileUtils
@@ -46,7 +47,6 @@ import _root_.java.lang.{Long => JLong}
import _root_.java.util
import _root_.scala.collection.mutable
-import _root_.scala.io.Source
@RunWith(classOf[Parameterized])
@@ -452,15 +452,6 @@ object TableEnvironmentITCase {
)
}
- def readFromResource(file: String): String = {
- val source = s"${getClass.getResource("/").getFile}../../src/test/scala/resources/$file"
- Source.fromFile(source).mkString
- }
-
- def replaceStageId(s: String): String = {
- s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
- }
-
def getPersonCsvTableSource: CsvTableSource = {
val header = "First#Id#Score#Last"
val csvRecords = getPersonData.map {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index 9823e08..d09bd5d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -25,10 +25,11 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF}
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
import org.apache.flink.types.Row
import org.hamcrest.Matchers.containsString
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.Test
import java.util
@@ -341,6 +342,123 @@ class BatchTableEnvironmentTest extends TableTestBase {
tableResult4.collect())
}
+ @Test
+ def testExecuteSqlWithExplainSelect(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val tableResult2 = util.tableEnv.executeSql(
+ "explain plan for select * from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ val it = tableResult2.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = readFromResource("testExecuteSqlWithExplainSelect1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithExplainInsert(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val tableResult3 = util.tableEnv.executeSql(
+ "explain plan for insert into MySink select a, b from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+ val it = tableResult3.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = readFromResource("testExecuteSqlWithExplainInsert1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithUnsupportedExplain(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ // TODO we can support them later
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan excluding attributes for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan including all attributes for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan with type for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan without implementation for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan as xml for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan as json for select * from MyTable")
+ }
+
+ private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
+ try {
+ tableEnv.executeSql(explain)
+ fail("This should not happen")
+ } catch {
+ case e: TableException =>
+ assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+ case e =>
+ fail("This should not happen, " + e.getMessage)
+ }
+ }
+
private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
while (expected.hasNext && actual.hasNext) {
assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 809d0aa..439fadb 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -29,20 +29,21 @@ import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => JStreamTableEnvironmentImpl}
import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{EnvironmentSettings, Expressions, TableConfig, Types, ValidationException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.api.{EnvironmentSettings, ResultKind, TableConfig, TableException, Types, ValidationException}
+import org.apache.flink.table.catalog.FunctionCatalog
import org.apache.flink.table.executor.StreamExecutor
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.planner.StreamPlanner
import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.TableTestUtil.{binaryNode, readFromResource, replaceStageId, streamTableNode, term, unaryNode}
import org.apache.flink.table.utils.{CatalogManagerMocks, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
import org.apache.flink.types.Row
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.Test
import org.mockito.Mockito.{mock, when}
import java.lang.{Integer => JInt, Long => JLong}
-import org.apache.flink.table.module.ModuleManager
class StreamTableEnvironmentTest extends TableTestBase {
@@ -208,6 +209,123 @@ class StreamTableEnvironmentTest extends TableTestBase {
jTEnv.fromDataStream(ds, $("rt").rowtime(), $("b"), $("c"), $("d"), $("e"), $("pt").proctime())
}
+ @Test
+ def testExecuteSqlWithExplainSelect(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val tableResult2 = util.tableEnv.executeSql(
+ "explain plan for select * from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ val it = tableResult2.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = readFromResource("testExecuteSqlWithExplainSelect0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithExplainInsert(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val tableResult3 = util.tableEnv.executeSql(
+ "explain plan for insert into MySink select a, b from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+ val it = tableResult3.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = readFromResource("testExecuteSqlWithExplainInsert0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithUnsupportedExplain(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ // TODO we can support them later
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan excluding attributes for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan including all attributes for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan with type for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan without implementation for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan as xml for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan as json for select * from MyTable")
+ }
+
+ private def testUnsupportedExplain(tableEnv: StreamTableEnvironment, explain: String): Unit = {
+ try {
+ tableEnv.executeSql(explain)
+ fail("This should not happen")
+ } catch {
+ case e: TableException =>
+ assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+ case e =>
+ fail("This should not happen, " + e.getMessage)
+ }
+ }
+
private def prepareSchemaExpressionParser:
(JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
new file mode 100644
index 0000000..aad6387
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+ LogicalProject(d=[$0], e=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+ DataStreamCalc(select=[a, b], where=[>(a, 10)])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : from: (a, b)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : where: (>(a, 10)), select: (a, b)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : to: Row
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: Unnamed
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
new file mode 100644
index 0000000..98206ae
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
@@ -0,0 +1,36 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+ LogicalProject(d=[$0], e=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+ DataSetCalc(select=[a, b], where=[>(a, 10)])
+ BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : from: (a, b)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : FlatMap
+ content : where: (>(a, 10)), select: (a, b)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : FlatMap
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : org.apache.flink.api.java.io.DiscardingOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : Map
+ ship_strategy : FORWARD
+
+ : Operator
+ content : where: (>(a, 10)), select: (a, b, c)
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+ BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : FlatMap
+ content : where: (>(a, 10)), select: (a, b, c)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : FlatMap
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : org.apache.flink.api.java.io.DiscardingOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
[flink] 05/05: [FLINK-17267] [table] Introduce Table#explain api
Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4959ebfc349bb80f88582f9ef2d60d09f1140737
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Apr 24 23:04:33 2020 +0800
[FLINK-17267] [table] Introduce Table#explain api
This closes #11905
---
flink-python/pyflink/table/explain_detail.py | 4 +-
flink-python/pyflink/table/table.py | 14 ++++++
flink-python/pyflink/table/table_environment.py | 5 +--
.../{explain_detail.py => tests/test_explain.py} | 30 ++++++++-----
.../table/tests/test_table_environment_api.py | 2 +-
flink-python/pyflink/util/utils.py | 4 +-
.../org/apache/flink/table/api/ExplainDetail.java | 4 +-
.../java/org/apache/flink/table/api/Table.java | 10 +++++
.../table/api/internal/TableEnvironmentImpl.java | 8 +++-
.../api/internal/TableEnvironmentInternal.java | 14 ++++++
.../apache/flink/table/api/internal/TableImpl.java | 6 +++
.../flink/table/api/internal/TableResultImpl.java | 51 +++++++++++++++++++---
.../table/planner/delegation/StreamPlanner.scala | 2 +-
.../flink/table/api/TableEnvironmentTest.scala | 24 +++++++++-
.../table/api/internal/BatchTableEnvImpl.scala | 8 ++--
.../flink/table/api/internal/TableEnvImpl.scala | 8 ++--
.../api/batch/BatchTableEnvironmentTest.scala | 22 ++++++++++
.../api/stream/StreamTableEnvironmentTest.scala | 22 ++++++++++
18 files changed, 202 insertions(+), 36 deletions(-)
diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py
index 48e7ce9..0cbcbe9 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -29,6 +29,6 @@ class ExplainDetail(object):
# 0.0 memory}
ESTIMATED_COST = 0
- # The changelog traits produced by a physical rel node.
+ # The changelog mode produced by a physical rel node.
# e.g. GroupAggregate(..., changelogMode=[I,UA,D])
- CHANGELOG_TRAITS = 1
+ CHANGELOG_MODE = 1
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index 728d331..b74797e 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -21,6 +21,7 @@ from pyflink.java_gateway import get_gateway
from pyflink.table.table_schema import TableSchema
from pyflink.util.utils import to_jarray
+from pyflink.util.utils import to_j_explain_detail_arr
__all__ = ['Table', 'GroupedTable', 'GroupWindowedTable', 'OverWindowedTable', 'WindowGroupedTable']
@@ -718,6 +719,19 @@ class Table(object):
"""
self._j_table.executeInsert(table_path, overwrite)
+ def explain(self, *extra_details):
+ """
+ Returns the AST of this table and the execution plan.
+
+ :param extra_details: The extra explain details which the explain result should include,
+ e.g. estimated cost, changelog mode for streaming
+ :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
+ :return: The statement for which the AST and execution plan will be returned.
+ :rtype: str
+ """
+ j_extra_details = to_j_explain_detail_arr(extra_details)
+ return self._j_table.explain(j_extra_details)
+
def __str__(self):
return self._j_table.toString()
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 94ff785..91073d8 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -471,13 +471,12 @@ class TableEnvironment(object):
def explain_sql(self, stmt, *extra_details):
"""
- Returns the AST of the specified statement and the execution plan to compute
- the result of the given statement.
+ Returns the AST of the specified statement and the execution plan.
:param stmt: The statement for which the AST and execution plan will be returned.
:type stmt: str
:param extra_details: The extra explain details which the explain result should include,
- e.g. estimated cost, change log trait for streaming
+ e.g. estimated cost, changelog mode for streaming
:type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
:return: The statement for which the AST and execution plan will be returned.
:rtype: str
diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/tests/test_explain.py
similarity index 58%
copy from flink-python/pyflink/table/explain_detail.py
copy to flink-python/pyflink/table/tests/test_explain.py
index 48e7ce9..1dccaba 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/tests/test_explain.py
@@ -16,19 +16,25 @@
# limitations under the License.
################################################################################
-__all__ = ['ExplainDetail']
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+from pyflink.table.explain_detail import ExplainDetail
-class ExplainDetail(object):
- """
- ExplainDetail defines the types of details for explain result.
- """
+class StreamTableExplainTests(PyFlinkStreamTableTestCase):
- # The cost information on physical rel node estimated by optimizer.
- # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network,
- # 0.0 memory}
- ESTIMATED_COST = 0
+ def test_explain(self):
+ t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
+ result = t.group_by("c").select("a.sum, c as b").explain(ExplainDetail.CHANGELOG_MODE)
- # The changelog traits produced by a physical rel node.
- # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
- CHANGELOG_TRAITS = 1
+ assert isinstance(result, str)
+
+
+if __name__ == '__main__':
+ import unittest
+
+ try:
+ import xmlrunner
+ testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index bd279af..96987de 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -265,7 +265,7 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
source_sink_utils.TestAppendSink(field_names, field_types))
result = t_env.explain_sql(
- "select a + 1, b, c from %s" % source, ExplainDetail.ESTIMATED_COST)
+ "select a + 1, b, c from %s" % source, ExplainDetail.CHANGELOG_MODE)
assert isinstance(result, str)
diff --git a/flink-python/pyflink/util/utils.py b/flink-python/pyflink/util/utils.py
index 29a20da..065b537 100644
--- a/flink-python/pyflink/util/utils.py
+++ b/flink-python/pyflink/util/utils.py
@@ -134,8 +134,8 @@ def to_j_explain_detail_arr(p_extra_details):
gateway = get_gateway()
def to_j_explain_detail(p_extra_detail):
- if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS:
- return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS
+ if p_extra_detail == ExplainDetail.CHANGELOG_MODE:
+ return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
else:
return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 6e9d014..5dfddc3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -29,8 +29,8 @@ public enum ExplainDetail {
ESTIMATED_COST,
/**
- * The changelog traits produced by a physical rel node.
+ * The changelog mode produced by a physical rel node.
* e.g. GroupAggregate(..., changelogMode=[I,UA,D])
*/
- CHANGELOG_TRAITS
+ CHANGELOG_MODE
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 3362ca5..d1239e6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1473,4 +1473,14 @@ public interface Table {
* @return The insert operation execution result.
*/
TableResult executeInsert(String tablePath, boolean overwrite);
+
+ /**
+ * Returns the AST of this table and the execution plan to compute
+ * the result of this table.
+ *
+ * @param extraDetails The extra explain details which the explain result should include,
+ * e.g. estimated cost, change log trait for streaming
+ * @return AST and the execution plan.
+ */
+ String explain(ExplainDetail... extraDetails);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 610627c..490e416 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -614,6 +614,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
@Override
+ public String explainInternal(List<Operation> operations, ExplainDetail... extraDetails) {
+ return planner.explain(operations, extraDetails);
+ }
+
+ @Override
public String[] getCompletionHints(String statement, int position) {
return planner.getCompletionHints(statement, position);
}
@@ -873,6 +878,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
.data(Collections.singletonList(Row.of(explanation)))
+ .setPrintStyle(TableResultImpl.PrintStyle.RAW_CONTENT)
.build();
} else {
@@ -997,7 +1003,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected ExplainDetail[] getExplainDetails(boolean extended) {
if (extended) {
if (isStreamingMode) {
- return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS };
+ return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE };
} else {
return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST };
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
index 2319538..228713c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
@@ -19,11 +19,13 @@
package org.apache.flink.table.api.internal;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
import java.util.List;
@@ -56,4 +58,16 @@ interface TableEnvironmentInternal extends TableEnvironment {
* @return the affected row counts (-1 means unknown).
*/
TableResult executeInternal(List<ModifyOperation> operations);
+
+ /**
+ * Returns the AST of this table and the execution plan to compute
+ * the result of this table.
+ *
+ * @param operations The operations to be explained.
+ * @param extraDetails The extra explain details which the explain result should include,
+ * e.g. estimated cost, changelog mode for streaming
+ * @return AST and the execution plan.
+ */
+ String explainInternal(List<Operation> operations, ExplainDetail... extraDetails);
+
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 9b2e319..d3a63aa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.AggregatedTable;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.FlatAggregateTable;
import org.apache.flink.table.api.GroupWindow;
import org.apache.flink.table.api.GroupWindowedTable;
@@ -565,6 +566,11 @@ public class TableImpl implements Table {
}
@Override
+ public String explain(ExplainDetail... extraDetails) {
+ return tableEnvironment.explainInternal(Collections.singletonList(getQueryOperation()), extraDetails);
+ }
+
+ @Override
public String toString() {
if (tableName == null) {
tableName = "UnnamedTable$" + uniqueId.getAndIncrement();
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
index 791ee89..a783976 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.utils.PrintUtils;
@@ -51,16 +52,19 @@ class TableResultImpl implements TableResult {
private final TableSchema tableSchema;
private final ResultKind resultKind;
private final Iterator<Row> data;
+ private final PrintStyle printStyle;
private TableResultImpl(
@Nullable JobClient jobClient,
TableSchema tableSchema,
ResultKind resultKind,
- Iterator<Row> data) {
+ Iterator<Row> data,
+ PrintStyle printStyle) {
this.jobClient = jobClient;
this.tableSchema = Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.data = Preconditions.checkNotNull(data, "data should not be null");
+ this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
@Override
@@ -86,7 +90,18 @@ class TableResultImpl implements TableResult {
@Override
public void print() {
Iterator<Row> it = collect();
- PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out));
+ switch (printStyle) {
+ case TABLEAU:
+ PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out));
+ break;
+ case RAW_CONTENT:
+ while (it.hasNext()) {
+ System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
+ }
+ break;
+ default:
+ throw new TableException("Unsupported print style: " + printStyle);
+ }
}
public static Builder builder() {
@@ -101,6 +116,7 @@ class TableResultImpl implements TableResult {
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private Iterator<Row> data = null;
+ private PrintStyle printStyle = PrintStyle.TABLEAU;
private Builder() {
}
@@ -138,7 +154,7 @@ class TableResultImpl implements TableResult {
}
/**
- * Specifies an row iterator as the execution result .
+ * Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
@@ -149,7 +165,7 @@ class TableResultImpl implements TableResult {
}
/**
- * Specifies an row list as the execution result .
+ * Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
@@ -160,11 +176,36 @@ class TableResultImpl implements TableResult {
}
/**
+ * Specifies print style. Default is {@link PrintStyle#TABLEAU}.
+ */
+ public Builder setPrintStyle(PrintStyle printStyle) {
+ Preconditions.checkNotNull(printStyle, "printStyle should not be null");
+ this.printStyle = printStyle;
+ return this;
+ }
+
+ /**
* Returns a {@link TableResult} instance.
*/
public TableResult build() {
- return new TableResultImpl(jobClient, tableSchema, resultKind, data);
+ return new TableResultImpl(jobClient, tableSchema, resultKind, data, printStyle);
}
}
+ /**
+ * PrintStyle defines the styles of printing.
+ */
+ public enum PrintStyle {
+ /**
+ * print the result schema and content as tableau form.
+ */
+ TABLEAU,
+
+ /**
+ * only print the result content as raw form.
+ * column delimiter is ",", row delimiter is "\n".
+ */
+ RAW_CONTENT
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 959de06..5dc6a6f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -114,7 +114,7 @@ class StreamPlanner(
} else {
SqlExplainLevel.DIGEST_ATTRIBUTES
}
- val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS)
+ val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_MODE)
sb.append(ExecNodePlanDumper.dagToString(
execNodes,
explainLevel,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 0e197ba..aa6bd2e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -911,7 +911,7 @@ class TableEnvironmentTest {
assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
val actual = tableEnv.explainSql(
- "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS)
+ "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_MODE)
val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
assertEquals(replaceStageId(expected), replaceStageId(actual))
}
@@ -951,6 +951,28 @@ class TableEnvironmentTest {
assertEquals(replaceStageId(expected), replaceStageId(actual))
}
+ @Test
+ def testTableExplain(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = tableEnv.sqlQuery("select * from MyTable where a > 10")
+ .explain(ExplainDetail.CHANGELOG_MODE)
+ val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def testUnsupportedExplain(explain: String): Unit = {
try {
tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index b3caf20..7ba116d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -217,7 +217,7 @@ abstract class BatchTableEnvImpl(
* @param extended Flag to include detailed optimizer estimates.
*/
private[flink] def explain(table: Table, extended: Boolean): String = {
- explain(
+ explainInternal(
JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
getExplainDetails(extended): _*)
}
@@ -225,12 +225,14 @@ abstract class BatchTableEnvImpl(
override def explain(table: Table): String = explain(table: Table, extended = false)
override def explain(extended: Boolean): String = {
- explain(
+ explainInternal(
bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava,
getExplainDetails(extended): _*)
}
- protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String = {
+ protected override def explainInternal(
+ operations: JList[Operation],
+ extraDetails: ExplainDetail*): String = {
require(operations.asScala.nonEmpty, "operations should not be empty")
val astList = operations.asScala.map {
case queryOperation: QueryOperation =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 1f01186..4c6cbd4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.core.execution.JobClient
import org.apache.flink.table.api._
+import org.apache.flink.table.api.internal.TableResultImpl.PrintStyle
import org.apache.flink.table.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder}
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.exceptions.{TableNotExistException => _, _}
@@ -762,11 +763,12 @@ abstract class TableEnvImpl(
case _: ShowViewsOperation =>
buildShowResult(listViews())
case explainOperation: ExplainOperation =>
- val explanation = explain(JCollections.singletonList(explainOperation.getChild))
+ val explanation = explainInternal(JCollections.singletonList(explainOperation.getChild))
TableResultImpl.builder.
resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
.data(JCollections.singletonList(Row.of(explanation)))
+ .setPrintStyle(PrintStyle.RAW_CONTENT)
.build
case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
@@ -1142,10 +1144,10 @@ abstract class TableEnvImpl(
"Unsupported SQL query! explainSql() only accepts a single SQL query.")
}
- explain(operations, extraDetails: _*)
+ explainInternal(operations, extraDetails: _*)
}
- protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String
+ protected def explainInternal(operations: JList[Operation], extraDetails: ExplainDetail*): String
override def fromValues(values: Expression*): Table = {
createTable(operationTreeBuilder.values(values: _*))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index c928314..74d820e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -504,6 +504,28 @@ class BatchTableEnvironmentTest extends TableTestBase {
assertEquals(replaceStageId(expected), replaceStageId(actual))
}
+ @Test
+ def testTableExplain(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = util.tableEnv.sqlQuery("select * from MyTable where a > 10").explain()
+ val expected = readFromResource("testExplainSqlWithSelect1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
try {
tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 25bb536..bb710c6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -384,6 +384,28 @@ class StreamTableEnvironmentTest extends TableTestBase {
assertEquals(replaceStageId(expected), replaceStageId(actual))
}
+ @Test
+ def testTableExplain(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = util.tableEnv.sqlQuery("select * from MyTable where a > 10").explain()
+ val expected = readFromResource("testExplainSqlWithSelect0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def prepareSchemaExpressionParser:
(JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
[flink] 02/05: [FLINK-17267] [table-planner] legacy batch planner
supports explain insert operation
Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2fc82ad5b92367c9f9a2aad5e6df3fd78108754e
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Apr 24 20:45:12 2020 +0800
[FLINK-17267] [table-planner] legacy batch planner supports explain insert operation
---
.../table/tests/test_table_environment_api.py | 5 +-
.../table/api/internal/BatchTableEnvImpl.scala | 191 +++++++++++++++++----
.../flink/table/api/internal/TableEnvImpl.scala | 63 ++++++-
.../table/plan/nodes/dataset/DataSetSink.scala | 57 ++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 3 +-
.../table/plan/rules/dataSet/DataSetSinkRule.scala | 55 ++++++
.../apache/flink/table/api/batch/ExplainTest.scala | 71 ++++++--
.../sql/validation/InsertIntoValidationTest.scala | 8 +
.../validation/InsertIntoValidationTest.scala | 4 +
.../src/test/scala/resources/testInsert1.out | 27 +++
.../test/scala/resources/testMultipleInserts1.out | 51 ++++++
11 files changed, 477 insertions(+), 58 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index c1bf04a..87c8023 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -400,8 +400,9 @@ class BatchTableEnvironmentTests(TableEnvironmentTest, PyFlinkBatchTableTestCase
t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)
- with self.assertRaises(TableException):
- t_env.explain(extended=True)
+ actual = t_env.explain(extended=True)
+
+ assert isinstance(actual, str)
def test_create_table_environment(self):
table_config = TableConfig()
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 343fc23..e25b8e4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -38,8 +38,9 @@ import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{DataSetQueryOperation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation}
import org.apache.flink.table.plan.BatchOptimizer
+import org.apache.flink.table.plan.nodes.LogicalSink
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.planner.Conversions
import org.apache.flink.table.runtime.MapRunner
@@ -74,7 +75,7 @@ abstract class BatchTableEnvImpl(
moduleManager: ModuleManager)
extends TableEnvImpl(config, catalogManager, moduleManager) {
- private val bufferedSinks = new JArrayList[DataSink[_]]
+ private val bufferedModifyOperations = new JArrayList[ModifyOperation]()
private[flink] val optimizer = new BatchOptimizer(
() => config.getPlannerConfig.unwrap(classOf[CalciteConfig]).orElse(CalciteConfig.DEFAULT),
@@ -170,8 +171,8 @@ abstract class BatchTableEnvImpl(
}
}
- override protected def addToBuffer(sink: DataSink[_]): Unit = {
- bufferedSinks.add(sink)
+ override protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit = {
+ bufferedModifyOperations.add(modifyOperation)
}
/**
@@ -215,32 +216,69 @@ abstract class BatchTableEnvImpl(
* @param extended Flag to include detailed optimizer estimates.
*/
private[flink] def explain(table: Table, extended: Boolean): String = {
- val ast = getRelBuilder.tableOperation(table.getQueryOperation).build()
- val optimizedPlan = optimizer.optimize(ast)
- val dataSet = translate[Row](
- optimizedPlan,
- getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan))(
- new GenericTypeInfo(classOf[Row]))
- dataSet.output(new DiscardingOutputFormat[Row])
- val env = dataSet.getExecutionEnvironment
+ explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended)
+ }
+
+ override def explain(table: Table): String = explain(table: Table, extended = false)
+
+ override def explain(extended: Boolean): String = {
+ explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
+ }
+
+ private def explain(operations: JList[Operation], extended: Boolean): String = {
+ require(operations.asScala.nonEmpty, "operations should not be empty")
+ val astList = operations.asScala.map {
+ case queryOperation: QueryOperation =>
+ getRelBuilder.tableOperation(queryOperation).build()
+ case modifyOperation: ModifyOperation =>
+ translateToRel(modifyOperation, addLogicalSink = true)
+ case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}")
+ }
+
+ val optimizedNodes = astList.map(optimizer.optimize)
+
+ val batchTableEnv = createDummyBatchTableEnv()
+ val dataSinks = optimizedNodes.zip(operations.asScala).map {
+ case (optimizedNode, operation) =>
+ operation match {
+ case queryOperation: QueryOperation =>
+ val dataSet = translate[Row](
+ optimizedNode,
+ getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))(
+ new GenericTypeInfo(classOf[Row]))
+ dataSet.output(new DiscardingOutputFormat[Row])
+ case modifyOperation: ModifyOperation =>
+ val tableSink = getTableSink(modifyOperation)
+ translate(
+ batchTableEnv,
+ optimizedNode,
+ tableSink,
+ getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+ case o =>
+ throw new TableException("Unsupported Operation: " + o.asSummaryString())
+ }
+ }
+
+ val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator)
+ val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
+
+ val env = dataSinks.head.getDataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
s"== Abstract Syntax Tree ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(ast)}" +
- System.lineSeparator +
- s"== Optimized Logical Plan ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(optimizedPlan)}" +
- System.lineSeparator +
- s"== Physical Execution Plan ==" +
- System.lineSeparator +
- s"$sqlPlan"
+ System.lineSeparator +
+ s"$astPlan" +
+ System.lineSeparator +
+ s"== Optimized Logical Plan ==" +
+ System.lineSeparator +
+ s"$optimizedPlan" +
+ System.lineSeparator +
+ s"== Physical Execution Plan ==" +
+ System.lineSeparator +
+ s"$sqlPlan"
}
- override def explain(table: Table): String = explain(table: Table, extended = false)
-
override def execute(jobName: String): JobExecutionResult = {
val plan = createPipelineAndClearBuffer(jobName)
@@ -292,10 +330,6 @@ abstract class BatchTableEnvImpl(
createPipelineAndClearBuffer(jobName)
}
- override def explain(extended: Boolean): String = {
- throw new TableException("This method is unsupported in old planner.")
- }
-
/**
* Translate the buffered sinks to Plan, and clear the buffer.
*
@@ -304,10 +338,11 @@ abstract class BatchTableEnvImpl(
* If the buffer is not clear after failure, the following `translate` will also fail.
*/
private def createPipelineAndClearBuffer(jobName: String): Pipeline = {
+ val dataSinks = translate(bufferedModifyOperations)
try {
- createPipeline(bufferedSinks, jobName)
+ createPipeline(dataSinks, jobName)
} finally {
- bufferedSinks.clear()
+ bufferedModifyOperations.clear()
}
}
@@ -357,6 +392,102 @@ abstract class BatchTableEnvImpl(
}
/**
+ * Translates a [[ModifyOperation]] into a [[RelNode]].
+ *
+ * The transformation does not involve optimizing the relational expression tree.
+ *
+ * @param modifyOperation The root ModifyOperation of the relational expression tree.
+ * @param addLogicalSink Whether add [[LogicalSink]] as the root.
+ * Currently, LogicalSink only is only used for explaining.
+ * @return The [[RelNode]] that corresponds to the translated [[ModifyOperation]].
+ */
+ private def translateToRel(modifyOperation: ModifyOperation, addLogicalSink: Boolean): RelNode = {
+ val input = getRelBuilder.tableOperation(modifyOperation.getChild).build()
+ if (addLogicalSink) {
+ val tableSink = getTableSink(modifyOperation)
+ modifyOperation match {
+ case s: CatalogSinkModifyOperation =>
+ LogicalSink.create(input, tableSink, s.getTableIdentifier.toString)
+ case o =>
+ throw new TableException("Unsupported Operation: " + o.asSummaryString())
+ }
+ } else {
+ input
+ }
+ }
+
+ /**
+ * Translates a list of [[ModifyOperation]] into a list of [[DataSink]].
+ *
+ * The transformation involves optimizing the relational expression tree as defined by
+ * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+ *
+ * @param modifyOperations The root [[ModifyOperation]]s of the relational expression tree.
+ * @return The [[DataSink]] that corresponds to the translated [[ModifyOperation]]s.
+ */
+ private def translate[T](modifyOperations: JList[ModifyOperation]): JList[DataSink[_]] = {
+ val relNodes = modifyOperations.asScala.map(o => translateToRel(o, addLogicalSink = false))
+ val optimizedNodes = relNodes.map(optimizer.optimize)
+
+ val batchTableEnv = createDummyBatchTableEnv()
+ modifyOperations.asScala.zip(optimizedNodes).map {
+ case (modifyOperation, optimizedNode) =>
+ val tableSink = getTableSink(modifyOperation)
+ translate(
+ batchTableEnv,
+ optimizedNode,
+ tableSink,
+ getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+ }.asJava
+ }
+
+ /**
+ * Translates an optimized [[RelNode]] into a [[DataSet]]
+ * and handed over to the [[TableSink]] to write it.
+ *
+ * @param optimizedNode The [[RelNode]] to translate.
+ * @param tableSink The [[TableSink]] to write the [[Table]] to.
+ * @return The [[DataSink]] that corresponds to the [[RelNode]] and the [[TableSink]].
+ */
+ private def translate[T](
+ batchTableEnv: BatchTableEnvImpl,
+ optimizedNode: RelNode,
+ tableSink: TableSink[T],
+ tableSchema: TableSchema): DataSink[_] = {
+ tableSink match {
+ case batchSink: BatchTableSink[T] =>
+ val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType)
+ .asInstanceOf[TypeInformation[T]]
+ // translate the Table into a DataSet and provide the type that the TableSink expects.
+ val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType)
+ // create a dummy NoOpOperator, which holds dummy DummyExecutionEnvironment as context.
+ // NoOpOperator will be ignored in OperatorTranslation
+ // when translating DataSet to Operator, while its input can be translated normally.
+ val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
+ // Give the DataSet to the TableSink to emit it.
+ batchSink.consumeDataSet(dummyOp)
+ case boundedSink: OutputFormatTableSink[T] =>
+ val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType)
+ .asInstanceOf[TypeInformation[T]]
+ // translate the Table into a DataSet and provide the type that the TableSink expects.
+ val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType)
+ // create a dummy NoOpOperator, which holds DummyExecutionEnvironment as context.
+ // NoOpOperator will be ignored in OperatorTranslation
+ // when translating DataSet to Operator, while its input can be translated normally.
+ val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
+ // use the OutputFormat to consume the DataSet.
+ val dataSink = dummyOp.output(boundedSink.getOutputFormat)
+ dataSink.name(
+ TableConnectorUtils.generateRuntimeName(
+ boundedSink.getClass,
+ boundedSink.getTableSchema.getFieldNames))
+ case _ =>
+ throw new TableException(
+ "BatchTableSink or OutputFormatTableSink required to emit batch Table.")
+ }
+ }
+
+ /**
* Translates a [[Table]] into a [[DataSet]].
*
* The transformation involves optimizing the relational expression tree as defined by
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index f40b75d..3a97106 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -37,7 +37,7 @@ import org.apache.flink.table.operations.ddl._
import org.apache.flink.table.operations.utils.OperationTreeBuilder
import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _}
import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder}
-import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
+import org.apache.flink.table.sinks.{BatchTableSink, OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils}
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.types.DataType
import org.apache.flink.table.util.JavaScalaConversionUtil
@@ -878,11 +878,11 @@ abstract class TableEnvImpl(
tableSink: TableSink[T]): DataSink[_]
/**
- * Add the given [[DataSink]] into the buffer.
+ * Add the given [[ModifyOperation]] into the buffer.
*
- * @param dataSink The [[DataSink]] to add the buffer to.
+ * @param modifyOperation The [[ModifyOperation]] to add the buffer to.
*/
- protected def addToBuffer(dataSink: DataSink[_]): Unit
+ protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit
override def insertInto(path: String, table: Table): Unit = {
val parser = planningConfigurationBuilder.createCalciteParser()
@@ -919,15 +919,64 @@ abstract class TableEnvImpl(
table: Table,
insertOptions: InsertOptions,
sinkIdentifier: ObjectIdentifier): Unit = {
- val dataSink = writeToSinkAndTranslate(table.getQueryOperation, insertOptions, sinkIdentifier)
- addToBuffer(dataSink)
+ val operation = new CatalogSinkModifyOperation(
+ sinkIdentifier,
+ table.getQueryOperation,
+ insertOptions.staticPartitions,
+ insertOptions.overwrite,
+ new JHashMap[String, String]())
+ addToBuffer(operation)
}
override def getParser: Parser = parser
override def getCatalogManager: CatalogManager = catalogManager
- private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
+ protected def getTableSink(modifyOperation: ModifyOperation): TableSink[_] = {
+ modifyOperation match {
+ case s: CatalogSinkModifyOperation =>
+ getTableSink(s.getTableIdentifier) match {
+ case None =>
+ throw new TableException(
+ s"No table was registered under the name ${s.getTableIdentifier}.")
+
+ case Some(tableSink) =>
+ tableSink match {
+ case _: BatchTableSink[_] => // do nothing
+ case _: OutputFormatTableSink[_] => // do nothing
+ case _ =>
+ throw new TableException(
+ "BatchTableSink or OutputFormatTableSink required to emit batch Table.")
+ }
+ // validate schema of source table and table sink
+ TableSinkUtils.validateSink(
+ s.getStaticPartitions,
+ s.getChild,
+ s.getTableIdentifier,
+ tableSink)
+ // set static partitions if it is a partitioned table sink
+ tableSink match {
+ case partitionableSink: PartitionableTableSink =>
+ partitionableSink.setStaticPartition(s.getStaticPartitions)
+ case _ =>
+ }
+ // set whether to overwrite if it's an OverwritableTableSink
+ tableSink match {
+ case overwritableTableSink: OverwritableTableSink =>
+ overwritableTableSink.setOverwrite(s.isOverwrite)
+ case _ =>
+ require(!s.isOverwrite, "INSERT OVERWRITE requires " +
+ s"${classOf[OverwritableTableSink].getSimpleName} but actually got " +
+ tableSink.getClass.getName)
+ }
+ tableSink
+ }
+ case o =>
+ throw new TableException("Unsupported Operation: " + o.asSummaryString())
+ }
+ }
+
+ protected def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
.map(_.getTable) match {
case Some(s) if s.isInstanceOf[ConnectorCatalogTable[_, _]] =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
new file mode 100644
index 0000000..f3fd194
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.DataSink
+import org.apache.flink.table.api.internal.BatchTableEnvImpl
+import org.apache.flink.table.plan.nodes.Sink
+import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
+import org.apache.flink.types.Row
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+/**
+ * A special [[DataSetRel]] which make explain result more pretty.
+ *
+ * <p>NOTES: We can't move the [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] logic
+ * from [[BatchTableEnvImpl]] to this node, because the return types of
+ * [[DataSetRel#translateToPlan]] (which returns [[DataSet]]) and
+ * [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] (which returns [[DataSink]]) are
+ * different. [[DataSetSink#translateToPlan]] just returns the input's translated result.
+ */
+class DataSetSink(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputRel: RelNode,
+ sink: TableSink[_],
+ sinkName: String)
+ extends Sink(cluster, traitSet, inputRel, sink, sinkName)
+ with DataSetRel {
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetSink(cluster, traitSet, inputs.get(0), sink, sinkName)
+ }
+
+ override def translateToPlan(tableEnv: BatchTableEnvImpl): DataSet[Row] = {
+ getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ }
+
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a872fb4..50a1bca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -220,7 +220,8 @@ object FlinkRuleSets {
DataSetValuesRule.INSTANCE,
DataSetCorrelateRule.INSTANCE,
DataSetPythonCorrelateRule.INSTANCE,
- BatchTableSourceScanRule.INSTANCE
+ BatchTableSourceScanRule.INSTANCE,
+ DataSetSinkRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
new file mode 100644
index 0000000..b7786b0
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.dataset.DataSetSink
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+class DataSetSinkRule
+ extends ConverterRule(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.DATASET,
+ "DataSetSinkRule") {
+
+ def convert(rel: RelNode): RelNode = {
+ val sink: FlinkLogicalSink = rel.asInstanceOf[FlinkLogicalSink]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
+ val convInput: RelNode = RelOptRule.convert(sink.getInput(0), FlinkConventions.DATASET)
+
+ new DataSetSink(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ sink.sink,
+ sink.sinkName
+ )
+ }
+}
+
+object DataSetSinkRule {
+ val INSTANCE = new DataSetSinkRule
+}
+
+
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index a2df711..fc33f8d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -18,20 +18,22 @@
package org.apache.flink.table.api.batch
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl
-import org.apache.flink.table.utils.TableTestUtil.batchTableNode
+import org.apache.flink.table.api.{Table, Types}
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, readFromResource, replaceStageId}
import org.apache.flink.test.util.MultipleProgramsTestBase
+
import org.junit.Assert.assertEquals
import org.junit._
class ExplainTest
extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
- private val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
-
@Test
def testFilterWithoutExtended(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -41,8 +43,7 @@ class ExplainTest
val table = scan.filter($"a" % 2 === 0)
val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testFilter0.out").mkString
+ val source = readFromResource("testFilter0.out")
val expected = replaceString(source, scan)
assertEquals(expected, result)
@@ -58,8 +59,7 @@ class ExplainTest
val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
.explain(table, extended = true).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testFilter1.out").mkString
+ val source = readFromResource("testFilter1.out")
val expected = replaceString(source, scan)
assertEquals(expected, result)
@@ -75,8 +75,7 @@ class ExplainTest
val table = table1.join(table2).where($"b" === $"d").select($"a", $"c")
val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testJoin0.out").mkString
+ val source = readFromResource("testJoin0.out")
val expected = replaceString(source, table1, table2)
assertEquals(expected, result)
@@ -93,8 +92,7 @@ class ExplainTest
val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
.explain(table, extended = true).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testJoin1.out").mkString
+ val source = readFromResource("testJoin1.out")
val expected = replaceString(source, table1, table2)
assertEquals(expected, result)
@@ -110,8 +108,7 @@ class ExplainTest
val table = table1.unionAll(table2)
val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testUnion0.out").mkString
+ val source = readFromResource("testUnion0.out")
val expected = replaceString(source, table1, table2)
assertEquals(expected, result)
@@ -128,13 +125,51 @@ class ExplainTest
val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl]
.explain(table, extended = true).replaceAll("\\r\\n", "\n")
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testUnion1.out").mkString
+ val source = readFromResource("testUnion1.out")
val expected = replaceString(source, table1, table2)
assertEquals(expected, result)
}
+ @Test
+ def testInsert(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = BatchTableEnvironment.create(env)
+
+ tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
+
+ tEnv.sqlUpdate("insert into targetTable select first, id from sourceTable")
+
+ val result = tEnv.explain(false)
+ val expected = readFromResource("testInsert1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(result))
+ }
+
+ @Test
+ def testMultipleInserts(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = BatchTableEnvironment.create(env)
+
+ tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable1", sink.configure(fieldNames, fieldTypes))
+ tEnv.registerTableSink("targetTable2", sink.configure(fieldNames, fieldTypes))
+
+ tEnv.sqlUpdate("insert into targetTable1 select first, id from sourceTable")
+ tEnv.sqlUpdate("insert into targetTable2 select last, id from sourceTable")
+
+ val result = tEnv.explain(false)
+ val expected = readFromResource("testMultipleInserts1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(result))
+ }
def replaceString(s: String, t1: Table, t2: Table): String = {
replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1)
@@ -144,14 +179,14 @@ class ExplainTest
replaceSourceNode(replaceString(s), t, 0)
}
- private def replaceSourceNode(s: String, t: Table, idx: Int) = {
+ private def replaceSourceNode(s: String, t: Table, idx: Int): String = {
s.replace(
s"%logicalSourceNode$idx%", batchTableNode(t)
.replace("DataSetScan", "FlinkLogicalDataSetScan"))
.replace(s"%sourceNode$idx%", batchTableNode(t))
}
- def replaceString(s: String) = {
+ def replaceString(s: String): String = {
s.replaceAll("\\r\\n", "\n")
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index d8915df..ab07039 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase {
// must fail because table sink schema has too few fields
util.tableEnv.sqlUpdate(sql)
+ // trigger validation
+ util.tableEnv.execute("test")
}
@Test(expected = classOf[ValidationException])
@@ -57,6 +59,8 @@ class InsertIntoValidationTest extends TableTestBase {
// must fail because types of table sink do not match query result
util.tableEnv.sqlUpdate(sql)
+ // trigger validation
+ util.tableEnv.execute("test")
}
@Test(expected = classOf[ValidationException])
@@ -73,6 +77,8 @@ class InsertIntoValidationTest extends TableTestBase {
// must fail because partial insert is not supported yet.
util.tableEnv.sqlUpdate(sql)
+ // trigger validation
+ util.tableEnv.execute("test")
}
@Test
@@ -92,5 +98,7 @@ class InsertIntoValidationTest extends TableTestBase {
val sql = "INSERT INTO targetTable SELECT a, b FROM sourceTable"
util.tableEnv.sqlUpdate(sql)
+ // trigger validation
+ util.tableEnv.execute("test")
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
index 4ae77f9..210a598 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
@@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase {
util.tableEnv.scan("sourceTable")
.select('a, 'b, 'c)
.insertInto("targetTable")
+ // trigger validation
+ util.tableEnv.execute("test")
}
@Test(expected = classOf[ValidationException])
@@ -57,5 +59,7 @@ class InsertIntoValidationTest extends TableTestBase {
util.tableEnv.scan("sourceTable")
.select('a, 'b, 'c)
.insertInto("targetTable")
+ // trigger validation
+ util.tableEnv.execute("test")
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
new file mode 100644
index 0000000..9b78a10
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e])
+ BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : to: Row
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
new file mode 100644
index 0000000..c8979bd
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out
@@ -0,0 +1,51 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e])
+ LogicalProject(last=[$3], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e])
+ BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e])
+ BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[last, id], source=[CsvTableSource(read fields: last, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : to: Row
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : to: Row
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
[flink] 01/05: [FLINK-17267] [table-planner] legacy stream planner
supports explain insert operation
Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 59ca355f05b668f1cb41cae02ce182ccc7c4d707
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Apr 24 15:22:09 2020 +0800
[FLINK-17267] [table-planner] legacy stream planner supports explain insert operation
---
.../table/calcite/RelTimeIndicatorConverter.scala | 30 +++
.../flink/table/plan/nodes/LogicalSink.scala | 55 +++++
.../org/apache/flink/table/plan/nodes/Sink.scala | 57 +++++
.../plan/nodes/datastream/DataStreamSink.scala | 204 ++++++++++++++++
.../plan/nodes/logical/FlinkLogicalSink.scala | 73 ++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 6 +-
.../plan/rules/datastream/DataStreamSinkRule.scala | 53 +++++
.../apache/flink/table/planner/StreamPlanner.scala | 263 ++++++---------------
.../flink/table/sinks/DataStreamTableSink.scala | 58 +++++
.../flink/table/api/TableEnvironmentITCase.scala | 4 +-
.../flink/table/api/stream/ExplainTest.scala | 75 ++++--
.../apache/flink/table/utils/TableTestBase.scala | 10 +
.../resources/testFromToDataStreamAndSqlUpdate.out | 23 +-
.../src/test/scala/resources/testInsert.out | 29 +++
.../test/scala/resources/testMultipleInserts.out | 55 +++++
.../resources/testSqlUpdateAndToDataStream.out | 20 +-
16 files changed, 785 insertions(+), 230 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 7f85245..82b3f2b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType,
import org.apache.flink.table.catalog.BasicOperatorTable
import org.apache.flink.table.functions.sql.ProctimeSqlFunction
import org.apache.flink.table.plan.logical.rel._
+import org.apache.flink.table.plan.nodes.LogicalSink
import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
import java.util.{Collections => JCollections}
@@ -173,6 +174,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
case temporalTableJoin: LogicalTemporalTableJoin =>
visit(temporalTableJoin)
+ case sink: LogicalSink =>
+ var newInput = sink.getInput.accept(this)
+ var needsConversion = false
+
+ val projects = newInput.getRowType.getFieldList.map { field =>
+ if (isProctimeIndicatorType(field.getType)) {
+ needsConversion = true
+ rexBuilder.makeCall(ProctimeSqlFunction, new RexInputRef(field.getIndex, field.getType))
+ } else {
+ new RexInputRef(field.getIndex, field.getType)
+ }
+ }
+
+ // add final conversion if necessary
+ if (needsConversion) {
+ newInput = LogicalProject.create(newInput, projects, newInput.getRowType.getFieldNames)
+ }
+ new LogicalSink(
+ sink.getCluster,
+ sink.getTraitSet,
+ newInput,
+ sink.sink,
+ sink.sinkName)
+
case _ =>
throw new TableException(s"Unsupported logical operator: ${other.getClass.getSimpleName}")
}
@@ -392,6 +417,11 @@ object RelTimeIndicatorConverter {
val converter = new RelTimeIndicatorConverter(rexBuilder)
val convertedRoot = rootRel.accept(converter)
+ // the LogicalSink is converted in RelTimeIndicatorConverter before
+ if (rootRel.isInstanceOf[LogicalSink]) {
+ return convertedRoot
+ }
+
var needsConversion = false
// materialize remaining proctime indicators
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/LogicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/LogicalSink.scala
new file mode 100644
index 0000000..478bb85
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/LogicalSink.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Sub-class of [[Sink]] that is a relational expression
+ * which writes out data of input node into a [[TableSink]].
+ * This class corresponds to Calcite logical rel.
+ */
+final class LogicalSink(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ input: RelNode,
+ sink: TableSink[_],
+ sinkName: String)
+ extends Sink(cluster, traitSet, input, sink, sinkName) {
+
+ override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+ new LogicalSink(cluster, traitSet, inputs.head, sink, sinkName)
+ }
+
+}
+
+object LogicalSink {
+
+ def create(input: RelNode, sink: TableSink[_], sinkName: String): LogicalSink = {
+ val traits = input.getCluster.traitSetOf(Convention.NONE)
+ new LogicalSink(input.getCluster, traits, input, sink, sinkName)
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/Sink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/Sink.scala
new file mode 100644
index 0000000..cd873d1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/Sink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+
+/**
+ * Relational expression that writes out data of input node into a [[TableSink]].
+ *
+ * @param cluster cluster that this relational expression belongs to
+ * @param traitSet the traits of this rel
+ * @param input input relational expression
+ * @param sink Table sink to write into
+ * @param sinkName Name of tableSink, which is not required property, that is, it could be null
+ */
+abstract class Sink(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ input: RelNode,
+ val sink: TableSink[_],
+ val sinkName: String)
+ extends SingleRel(cluster, traitSet, input) {
+
+ override def deriveRowType(): RelDataType = {
+ val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ val tableSchema = sink.getTableSchema
+ typeFactory.buildLogicalRowType(tableSchema.getFieldNames, tableSchema.getFieldTypes)
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("name", sinkName, sinkName != null)
+ .item("fields", sink.getTableSchema.getFieldNames.mkString(", "))
+ }
+
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSink.scala
new file mode 100644
index 0000000..6f2df68
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSink.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink => StreamingDataStreamSink}
+import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.Sink
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
+import org.apache.flink.table.planner.{DataStreamConversions, StreamPlanner}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.sinks.{AppendStreamTableSink, DataStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
+import org.apache.flink.table.types.utils.TypeConversions
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+
+import _root_.java.lang.{Boolean => JBool}
+
+import _root_.scala.collection.JavaConverters._
+
+
+/**
+ * Stream physical RelNode to to write data into an external sink defined by a [[TableSink]].
+ */
+class DataStreamSink(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputRel: RelNode,
+ sink: TableSink[_],
+ sinkName: String)
+ extends Sink(cluster, traitSet, inputRel, sink, sinkName)
+ with DataStreamRel {
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamSink(cluster, traitSet, inputs.get(0), sink, sinkName)
+ }
+
+ override def translateToPlan(planner: StreamPlanner): DataStream[CRow] = {
+ val inputTransform = writeToSink(planner).asInstanceOf[Transformation[CRow]]
+ new DataStream(planner.getExecutionEnvironment, inputTransform)
+ }
+
+ private def writeToSink[T](planner: StreamPlanner): Transformation[_] = {
+ sink match {
+ case retractSink: RetractStreamTableSink[T] =>
+ val resultSink = writeToRetractSink(retractSink, planner)
+ resultSink.getTransformation
+
+ case upsertSink: UpsertStreamTableSink[T] =>
+ val resultSink = writeToUpsertSink(upsertSink, planner)
+ resultSink.getTransformation
+
+ case appendSink: AppendStreamTableSink[T] =>
+ val resultSink = writeToAppendSink(appendSink, planner)
+ resultSink.getTransformation
+
+ case dataStreamTableSink: DataStreamTableSink[_] =>
+ // if no change flags are requested, verify table is an insert-only (append-only) table.
+ if (!dataStreamTableSink.withChangeFlag && !UpdatingPlanChecker.isAppendOnly(getInput)) {
+ throw new ValidationException(
+ "Table is not an append-only table. " +
+ "Use the toRetractStream() in order to handle add and retract messages.")
+ }
+ val dataStream = translateInput(
+ planner,
+ getTableSchema,
+ dataStreamTableSink.getOutputType,
+ dataStreamTableSink.withChangeFlag)
+ dataStream.getTransformation
+
+ case _ =>
+ throw new ValidationException("Stream Tables can only be emitted by AppendStreamTableSink, "
+ + "RetractStreamTableSink, or UpsertStreamTableSink.")
+ }
+ }
+
+ private def writeToRetractSink[T](
+ sink: RetractStreamTableSink[T],
+ planner: StreamPlanner): StreamingDataStreamSink[_]= {
+ // retraction sink can always be used
+ val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
+ .asInstanceOf[TypeInformation[JTuple2[JBool, T]]]
+ // translate the Table into a DataStream and provide the type that the TableSink expects.
+ val result: DataStream[JTuple2[JBool, T]] =
+ translateToType(
+ planner,
+ withChangeFlag = true,
+ outputType)
+ // Give the DataStream to the TableSink to emit it.
+ sink.consumeDataStream(result)
+ }
+
+ private def writeToAppendSink[T](
+ sink: AppendStreamTableSink[T],
+ planner: StreamPlanner): StreamingDataStreamSink[_]= {
+ // verify table is an insert-only (append-only) table
+ if (!UpdatingPlanChecker.isAppendOnly(getInput)) {
+ throw new TableException(
+ "AppendStreamTableSink requires that Table has only insert changes.")
+ }
+ val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
+ .asInstanceOf[TypeInformation[T]]
+ val resultType = getTableSchema
+ // translate the Table into a DataStream and provide the type that the TableSink expects.
+ val result: DataStream[T] =
+ translateInput(
+ planner,
+ resultType,
+ outputType,
+ withChangeFlag = false)
+ // Give the DataStream to the TableSink to emit it.
+ sink.consumeDataStream(result)
+ }
+
+ private def writeToUpsertSink[T](
+ sink: UpsertStreamTableSink[T],
+ planner: StreamPlanner): StreamingDataStreamSink[_] = {
+ // optimize plan
+ // check for append only table
+ val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(getInput)
+ sink.setIsAppendOnly(isAppendOnlyTable)
+ // extract unique key fields
+ val sinkFieldNames = sink.getTableSchema.getFieldNames
+ val tableKeys: Option[Array[String]] = UpdatingPlanChecker
+ .getUniqueKeyFields(getInput, sinkFieldNames)
+ // check that we have keys if the table has changes (is not append-only)
+ tableKeys match {
+ case Some(keys) => sink.setKeyFields(keys)
+ case None if isAppendOnlyTable => sink.setKeyFields(null)
+ case None if !isAppendOnlyTable => throw new TableException(
+ "UpsertStreamTableSink requires that Table has full primary keys if it is updated.")
+ }
+ val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
+ .asInstanceOf[TypeInformation[JTuple2[JBool, T]]]
+ val resultType = getTableSchema
+ // translate the Table into a DataStream and provide the type that the TableSink expects.
+ val result: DataStream[JTuple2[JBool, T]] =
+ translateInput(
+ planner,
+ resultType,
+ outputType,
+ withChangeFlag = true)
+ // Give the DataStream to the TableSink to emit it.
+ sink.consumeDataStream(result)
+ }
+
+ private def translateToType[A](
+ planner: StreamPlanner,
+ withChangeFlag: Boolean,
+ tpe: TypeInformation[A]): DataStream[A] = {
+ val rowType = getTableSchema
+
+ // if no change flags are requested, verify table is an insert-only (append-only) table.
+ if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(input)) {
+ throw new ValidationException(
+ "Table is not an append-only table. " +
+ "Use the toRetractStream() in order to handle add and retract messages.")
+ }
+
+ // get CRow plan
+ translateInput(planner, rowType, tpe, withChangeFlag)
+ }
+
+ private def translateInput[A](
+ planner: StreamPlanner,
+ logicalSchema: TableSchema,
+ tpe: TypeInformation[A],
+ withChangeFlag: Boolean): DataStream[A] = {
+ val dataStream = getInput().asInstanceOf[DataStreamRel].translateToPlan(planner)
+ DataStreamConversions.convert(dataStream, logicalSchema, withChangeFlag, tpe, planner.getConfig)
+ }
+
+ /**
+ * Returns the record type of the optimized plan with field names of the logical plan.
+ */
+ private def getTableSchema: TableSchema = {
+ val fieldTypes = getInput.getRowType.getFieldList.asScala.map(_.getType)
+ .map(FlinkTypeFactory.toTypeInfo)
+ .map(TypeConversions.fromLegacyInfoToDataType)
+ .toArray
+ TableSchema.builder().fields(sink.getTableSchema.getFieldNames, fieldTypes).build()
+ }
+
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
new file mode 100644
index 0000000..2e671ac
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import org.apache.flink.table.plan.nodes.{FlinkConventions, LogicalSink, Sink}
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Sub-class of [[Sink]] that is a relational expression
+ * which writes out data of input node into a [[TableSink]].
+ */
+class FlinkLogicalSink(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ input: RelNode,
+ sink: TableSink[_],
+ sinkName: String)
+ extends Sink(cluster, traitSet, input, sink, sinkName)
+ with FlinkLogicalRel {
+
+ override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+ new FlinkLogicalSink(cluster, traitSet, inputs.head, sink, sinkName)
+ }
+
+}
+
+private class FlinkLogicalSinkConverter
+ extends ConverterRule(
+ classOf[LogicalSink],
+ Convention.NONE,
+ FlinkConventions.LOGICAL,
+ "FlinkLogicalSinkConverter") {
+
+ override def convert(rel: RelNode): RelNode = {
+ val sink = rel.asInstanceOf[LogicalSink]
+ val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
+ FlinkLogicalSink.create(newInput, sink.sink, sink.sinkName)
+ }
+}
+
+object FlinkLogicalSink {
+ val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter()
+
+ def create(input: RelNode, sink: TableSink[_], sinkName: String): FlinkLogicalSink = {
+ val cluster = input.getCluster
+ val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
+ new FlinkLogicalSink(cluster, traitSet, input, sink, sinkName)
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 964dd04..a872fb4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -146,7 +146,8 @@ object FlinkRuleSets {
FlinkLogicalTableFunctionScan.CONVERTER,
FlinkLogicalMatch.CONVERTER,
FlinkLogicalTableAggregate.CONVERTER,
- FlinkLogicalWindowTableAggregate.CONVERTER
+ FlinkLogicalWindowTableAggregate.CONVERTER,
+ FlinkLogicalSink.CONVERTER
)
/**
@@ -265,7 +266,8 @@ object FlinkRuleSets {
DataStreamTableAggregateRule.INSTANCE,
DataStreamGroupWindowTableAggregateRule.INSTANCE,
DataStreamPythonCalcRule.INSTANCE,
- DataStreamPythonCorrelateRule.INSTANCE
+ DataStreamPythonCorrelateRule.INSTANCE,
+ DataStreamSinkRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSinkRule.scala
new file mode 100644
index 0000000..6d02308
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSinkRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSink
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+
+class DataStreamSinkRule
+ extends ConverterRule(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.DATASTREAM,
+ "DataStreamSinkRule") {
+
+ def convert(rel: RelNode): RelNode = {
+ val sink: FlinkLogicalSink = rel.asInstanceOf[FlinkLogicalSink]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+ val convInput: RelNode = RelOptRule.convert(sink.getInput(0), FlinkConventions.DATASTREAM)
+
+ new DataStreamSink(
+ rel.getCluster,
+ traitSet,
+ convInput,
+ sink.sink,
+ sink.sinkName
+ )
+ }
+}
+
+object DataStreamSinkRule {
+ val INSTANCE = new DataStreamSinkRule
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 659bcfe..f549168 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -18,10 +18,8 @@
package org.apache.flink.table.planner
import org.apache.flink.annotation.VisibleForTesting
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.calcite._
@@ -34,8 +32,8 @@ import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryConte
import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
import org.apache.flink.table.operations._
import org.apache.flink.table.plan.StreamOptimizer
+import org.apache.flink.table.plan.nodes.LogicalSink
import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
-import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sinks._
import org.apache.flink.table.types.utils.TypeConversions
@@ -46,7 +44,6 @@ import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
-import _root_.java.lang.{Boolean => JBool}
import _root_.java.util
import _root_.java.util.Objects
import _root_.java.util.function.{Supplier => JSupplier}
@@ -111,20 +108,55 @@ class StreamPlanner(
override def getParser: Parser = parser
- override def translate(tableOperations: util.List[ModifyOperation])
- : util.List[Transformation[_]] = {
- tableOperations.asScala.map(translate).filter(Objects.nonNull).asJava
+ override def translate(
+ tableOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
+ val planner = createDummyPlanner()
+ tableOperations.asScala.map { operation =>
+ val (ast, updatesAsRetraction) = translateToRel(operation)
+ val optimizedPlan = optimizer.optimize(ast, updatesAsRetraction, getRelBuilder)
+ val dataStream = translateToCRow(planner, optimizedPlan)
+ dataStream.getTransformation.asInstanceOf[Transformation[_]]
+ }.filter(Objects.nonNull).asJava
}
override def explain(operations: util.List[Operation], extended: Boolean): String = {
- operations.asScala.map {
+ require(operations.asScala.nonEmpty, "operations should not be empty")
+ val astWithUpdatesAsRetractionTuples = operations.asScala.map {
case queryOperation: QueryOperation =>
- explain(queryOperation)
+ (getRelBuilder.tableOperation(queryOperation).build(), false)
case modifyOperation: ModifyOperation =>
- explain(modifyOperation.getChild)
- case operation =>
- throw new TableException(s"${operation.getClass.getCanonicalName} is not supported")
- }.mkString(s"${System.lineSeparator}${System.lineSeparator}")
+ translateToRel(modifyOperation)
+ case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
+ }
+
+ val optimizedNodes = astWithUpdatesAsRetractionTuples.map {
+ case (ast, updatesAsRetraction) =>
+ optimizer.optimize(ast, updatesAsRetraction, getRelBuilder)
+ }
+
+ val planner = createDummyPlanner()
+ val dataStreams = optimizedNodes.map(p => translateToCRow(planner, p))
+
+ val astPlan = astWithUpdatesAsRetractionTuples.map {
+ p => RelOptUtil.toString(p._1)
+ }.mkString(System.lineSeparator)
+ val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
+
+ val env = dataStreams.head.getExecutionEnvironment
+ val jsonSqlPlan = env.getExecutionPlan
+ val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
+
+ s"== Abstract Syntax Tree ==" +
+ System.lineSeparator +
+ s"$astPlan" +
+ System.lineSeparator +
+ s"== Optimized Logical Plan ==" +
+ System.lineSeparator +
+ s"$optimizedPlan" +
+ System.lineSeparator +
+ s"== Physical Execution Plan ==" +
+ System.lineSeparator +
+ s"$sqlPlan"
}
override def getCompletionHints(
@@ -135,11 +167,10 @@ class StreamPlanner(
planner.getCompletionHints(statement, position)
}
- private def translate(tableOperation: ModifyOperation)
- : Transformation[_] = {
- tableOperation match {
- case s : UnregisteredSinkModifyOperation[_] =>
- writeToSink(s.getChild, s.getSink)
+ private def translateToRel(modifyOperation: ModifyOperation): (RelNode, Boolean) = {
+ modifyOperation match {
+ case s: UnregisteredSinkModifyOperation[_] =>
+ writeToSink(s.getChild, s.getSink, "UnregisteredSink")
case catalogSink: CatalogSinkModifyOperation =>
getTableSink(catalogSink.getTableIdentifier)
@@ -164,7 +195,10 @@ class StreamPlanner(
s"${classOf[OverwritableTableSink].getSimpleName} but actually got " +
sink.getClass.getName)
}
- writeToSink(catalogSink.getChild, sink)
+ writeToSink(
+ catalogSink.getChild,
+ sink,
+ catalogSink.getTableIdentifier.asSummaryString())
}) match {
case Some(t) => t
case None =>
@@ -178,41 +212,19 @@ class StreamPlanner(
case UpdateMode.UPSERT => (false, true)
}
- translateToType(
- tableOperation.getChild,
- isRetract,
- withChangeFlag,
- TypeConversions.fromDataTypeToLegacyInfo(outputConversion.getType)).getTransformation
+ val tableSink = new DataStreamTableSink(
+ outputConversion.getChild.getTableSchema,
+ TypeConversions.fromDataTypeToLegacyInfo(outputConversion.getType),
+ withChangeFlag)
+ val input = getRelBuilder.tableOperation(modifyOperation.getChild).build()
+ val sink = LogicalSink.create(input, tableSink, "DataStreamTableSink")
+ (sink, isRetract)
case _ =>
- throw new TableException(s"Unsupported ModifyOperation: $tableOperation")
+ throw new TableException(s"Unsupported ModifyOperation: $modifyOperation")
}
}
- private def explain(tableOperation: QueryOperation) = {
- val ast = getRelBuilder.tableOperation(tableOperation).build()
- val optimizedPlan = optimizer
- .optimize(ast, updatesAsRetraction = false, getRelBuilder)
- val dataStream = translateToCRow(optimizedPlan)
-
- val env = dataStream.getExecutionEnvironment
- val jsonSqlPlan = env.getExecutionPlan
-
- val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
-
- s"== Abstract Syntax Tree ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(ast)}" +
- System.lineSeparator +
- s"== Optimized Logical Plan ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(optimizedPlan)}" +
- System.lineSeparator +
- s"== Physical Execution Plan ==" +
- System.lineSeparator +
- s"$sqlPlan"
- }
-
private def getFlinkPlanner: FlinkPlannerImpl = {
val currentCatalogName = catalogManager.getCurrentCatalog
val currentDatabase = catalogManager.getCurrentDatabase
@@ -232,9 +244,7 @@ class StreamPlanner(
private[flink] def getExecutionEnvironment: StreamExecutionEnvironment =
executor.asInstanceOf[StreamExecutor].getExecutionEnvironment
- private def translateToCRow(logicalPlan: RelNode): DataStream[CRow] = {
- val planner = createDummyPlanner()
-
+ private def translateToCRow(planner: StreamPlanner, logicalPlan: RelNode): DataStream[CRow] = {
logicalPlan match {
case node: DataStreamRel =>
getExecutionEnvironment.configure(
@@ -249,163 +259,38 @@ class StreamPlanner(
private def writeToSink[T](
tableOperation: QueryOperation,
- sink: TableSink[T])
- : Transformation[_] = {
+ sink: TableSink[T],
+ sinkName: String): (RelNode, Boolean) = {
- val resultSink = sink match {
+ val updatesAsRetraction = sink match {
case retractSink: RetractStreamTableSink[T] =>
retractSink match {
case _: PartitionableTableSink =>
throw new TableException("Partitionable sink in retract stream mode " +
"is not supported yet!")
- case _ =>
+ case _ => // do nothing
}
- writeToRetractSink(retractSink, tableOperation)
+ true
case upsertSink: UpsertStreamTableSink[T] =>
upsertSink match {
case _: PartitionableTableSink =>
throw new TableException("Partitionable sink in upsert stream mode " +
"is not supported yet!")
- case _ =>
+ case _ => // do nothing
}
- writeToUpsertSink(upsertSink, tableOperation)
+ false
- case appendSink: AppendStreamTableSink[T] =>
- writeToAppendSink(appendSink, tableOperation)
+ case _: AppendStreamTableSink[T] =>
+ false
case _ =>
throw new ValidationException("Stream Tables can only be emitted by AppendStreamTableSink, "
+ "RetractStreamTableSink, or UpsertStreamTableSink.")
}
- if (resultSink != null) {
- resultSink.getTransformation
- } else {
- null
- }
- }
-
- private def writeToRetractSink[T](
- sink: RetractStreamTableSink[T],
- tableOperation: QueryOperation)
- : DataStreamSink[_]= {
- // retraction sink can always be used
- val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
- .asInstanceOf[TypeInformation[JTuple2[JBool, T]]]
- // translate the Table into a DataStream and provide the type that the TableSink expects.
- val result: DataStream[JTuple2[JBool, T]] =
- translateToType(
- tableOperation,
- updatesAsRetraction = true,
- withChangeFlag = true,
- outputType)
- // Give the DataStream to the TableSink to emit it.
- sink.consumeDataStream(result)
- }
-
- private def writeToAppendSink[T](
- sink: AppendStreamTableSink[T],
- tableOperation: QueryOperation)
- : DataStreamSink[_]= {
- // optimize plan
- val relNode = getRelBuilder.tableOperation(tableOperation).build()
- val optimizedPlan = optimizer.optimize(relNode, updatesAsRetraction = false, getRelBuilder)
- // verify table is an insert-only (append-only) table
- if (!UpdatingPlanChecker.isAppendOnly(optimizedPlan)) {
- throw new TableException(
- "AppendStreamTableSink requires that Table has only insert changes.")
- }
- val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
- .asInstanceOf[TypeInformation[T]]
- val resultType = getTableSchema(tableOperation.getTableSchema.getFieldNames, optimizedPlan)
- // translate the Table into a DataStream and provide the type that the TableSink expects.
- val result: DataStream[T] =
- translateOptimized(
- optimizedPlan,
- resultType,
- outputType,
- withChangeFlag = false)
- // Give the DataStream to the TableSink to emit it.
- sink.consumeDataStream(result)
- }
-
- private def writeToUpsertSink[T](
- sink: UpsertStreamTableSink[T],
- tableOperation: QueryOperation)
- : DataStreamSink[_] = {
- // optimize plan
- val relNode = getRelBuilder.tableOperation(tableOperation).build()
- val optimizedPlan = optimizer.optimize(relNode, updatesAsRetraction = false, getRelBuilder)
- // check for append only table
- val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(optimizedPlan)
- sink.setIsAppendOnly(isAppendOnlyTable)
- // extract unique key fields
- val sinkFieldNames = sink.getTableSchema.getFieldNames
- val tableKeys: Option[Array[String]] = UpdatingPlanChecker
- .getUniqueKeyFields(optimizedPlan, sinkFieldNames)
- // check that we have keys if the table has changes (is not append-only)
- tableKeys match {
- case Some(keys) => sink.setKeyFields(keys)
- case None if isAppendOnlyTable => sink.setKeyFields(null)
- case None if !isAppendOnlyTable => throw new TableException(
- "UpsertStreamTableSink requires that Table has full primary keys if it is updated.")
- }
- val outputType = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType)
- .asInstanceOf[TypeInformation[JTuple2[JBool, T]]]
- val resultType = getTableSchema(tableOperation.getTableSchema.getFieldNames, optimizedPlan)
- // translate the Table into a DataStream and provide the type that the TableSink expects.
- val result: DataStream[JTuple2[JBool, T]] =
- translateOptimized(
- optimizedPlan,
- resultType,
- outputType,
- withChangeFlag = true)
- // Give the DataStream to the TableSink to emit it.
- sink.consumeDataStream(result)
- }
-
- private def translateToType[A](
- table: QueryOperation,
- updatesAsRetraction: Boolean,
- withChangeFlag: Boolean,
- tpe: TypeInformation[A])
- : DataStream[A] = {
- val relNode = getRelBuilder.tableOperation(table).build()
- val dataStreamPlan = optimizer.optimize(relNode, updatesAsRetraction, getRelBuilder)
- val rowType = getTableSchema(table.getTableSchema.getFieldNames, dataStreamPlan)
-
- // if no change flags are requested, verify table is an insert-only (append-only) table.
- if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(dataStreamPlan)) {
- throw new ValidationException(
- "Table is not an append-only table. " +
- "Use the toRetractStream() in order to handle add and retract messages.")
- }
-
- // get CRow plan
- translateOptimized(dataStreamPlan, rowType, tpe, withChangeFlag)
- }
-
- private def translateOptimized[A](
- optimizedPlan: RelNode,
- logicalSchema: TableSchema,
- tpe: TypeInformation[A],
- withChangeFlag: Boolean)
- : DataStream[A] = {
- val dataStream = translateToCRow(optimizedPlan)
- DataStreamConversions.convert(dataStream, logicalSchema, withChangeFlag, tpe, config)
- }
-
- /**
- * Returns the record type of the optimized plan with field names of the logical plan.
- */
- private def getTableSchema(originalNames: Array[String], optimizedPlan: RelNode): TableSchema = {
- val fieldTypes = optimizedPlan.getRowType.getFieldList.asScala.map(_.getType)
- .map(FlinkTypeFactory.toTypeInfo)
- .map(TypeConversions.fromLegacyInfoToDataType)
- .toArray
-
- TableSchema.builder().fields(originalNames, fieldTypes).build()
+ val input = getRelBuilder.tableOperation(tableOperation).build()
+ (LogicalSink.create(input, sink, sinkName), updatesAsRetraction)
}
private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
new file mode 100644
index 0000000..ba2b132
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{TableException, TableSchema}
+import org.apache.flink.table.operations.OutputConversionModifyOperation
+
+/**
+ * A special [[TableSink]] that represents information of [[OutputConversionModifyOperation]].
+ *
+ * @param outputType The [[TypeInformation]] that specifies the type of the [[DataStream]].
+ * @param withChangeFlag Set to true to emit records with change flags.
+ * @tparam T The type of the resulting [[DataStream]].
+ */
+@Internal
+class DataStreamTableSink[T](
+ tableSchema: TableSchema,
+ outputType: TypeInformation[T],
+ val withChangeFlag: Boolean)
+ extends TableSink[T] {
+
+ /**
+ * Return the type expected by this [[TableSink]].
+ *
+ * This type should depend on the types returned by [[getTableSchema]].
+ *
+ * @return The type expected by this [[TableSink]].
+ */
+ override def getOutputType: TypeInformation[T] = outputType
+
+ override def getTableSchema: TableSchema = tableSchema
+
+ override def configure(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+ throw new TableException(s"configure is not supported.")
+ }
+
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 6a43957..c5e90bf 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -221,9 +221,7 @@ class TableEnvironmentITCase(tableEnvName: String) {
}
val streamEnv = ScalaStreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = ScalaStreamTableEnvironment.create(streamEnv, settings)
- val t = streamEnv.fromCollection(getPersonData)
- .toTable(streamTableEnv, 'first, 'id, 'score, 'last)
- streamTableEnv.registerTable("MyTable", t)
+ streamTableEnv.registerTableSource("MyTable", getPersonCsvTableSource)
val sink1Path = registerCsvTableSink(streamTableEnv, Array("first"), Array(STRING), "MySink1")
checkEmptyFile(sink1Path)
StreamITCase.clear
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 62f6da3..14ba09b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -18,11 +18,14 @@
package org.apache.flink.table.api.stream
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil.streamTableNode
+import org.apache.flink.table.api.{EnvironmentSettings, Table, Types}
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId, streamTableNode}
import org.apache.flink.test.util.AbstractTestBase
import org.junit.Assert.assertEquals
@@ -30,8 +33,6 @@ import org.junit._
class ExplainTest extends AbstractTestBase {
- private val testFilePath = this.getClass.getResource("/").getFile
-
@Test
def testFilter(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -41,10 +42,9 @@ class ExplainTest extends AbstractTestBase {
val scan = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
val table = scan.filter($"a" % 2 === 0)
- val result = replaceString(tEnv.explain(table))
+ val result = replaceStageId(tEnv.explain(table))
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testFilterStream0.out").mkString
+ val source = readFromResource("testFilterStream0.out")
val expect = replaceString(source, scan)
assertEquals(expect, result)
}
@@ -59,34 +59,69 @@ class ExplainTest extends AbstractTestBase {
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1.unionAll(table2)
- val result = replaceString(tEnv.explain(table))
+ val result = replaceStageId(tEnv.explain(table))
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testUnionStream0.out").mkString
+ val source = readFromResource("testUnionStream0.out")
val expect = replaceString(source, table1, table2)
assertEquals(expect, result)
}
+ @Test
+ def testInsert(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
+ val tEnv = StreamTableEnvironment.create(env, settings)
+
+ tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
+
+ tEnv.sqlUpdate("INSERT INTO targetTable SELECT first, id FROM sourceTable")
+
+ val result = tEnv.explain(false)
+ val source = readFromResource("testInsert.out")
+ assertEquals(replaceStageId(source), replaceStageId(result))
+ }
+
+ @Test
+ def testMultipleInserts(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
+ val tEnv = StreamTableEnvironment.create(env, settings)
+
+ tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.registerTableSink("targetTable1", sink.configure(fieldNames, fieldTypes))
+ tEnv.registerTableSink("targetTable2", sink.configure(fieldNames, fieldTypes))
+
+ tEnv.sqlUpdate("INSERT INTO targetTable1 SELECT first, id FROM sourceTable")
+ tEnv.sqlUpdate("INSERT INTO targetTable2 SELECT last, id FROM sourceTable")
+
+ val result = tEnv.explain(false)
+ val source = readFromResource("testMultipleInserts.out")
+ assertEquals(replaceStageId(source), replaceStageId(result))
+ }
+
def replaceString(s: String, t1: Table, t2: Table): String = {
- replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1)
+ replaceSourceNode(replaceSourceNode(replaceStageId(s), t1, 0), t2, 1)
}
def replaceString(s: String, t: Table): String = {
- replaceSourceNode(replaceString(s), t, 0)
+ replaceSourceNode(replaceStageId(s), t, 0)
}
- private def replaceSourceNode(s: String, t: Table, idx: Int) = {
- replaceString(s)
+ private def replaceSourceNode(s: String, t: Table, idx: Int): String = {
+ replaceStageId(s)
.replace(
s"%logicalSourceNode$idx%", streamTableNode(t)
.replace("DataStreamScan", "FlinkLogicalDataStreamScan"))
.replace(s"%sourceNode$idx%", streamTableNode(t))
}
- def replaceString(s: String) = {
- /* Stage {id} is ignored, because id keeps incrementing in test class
- * while StreamExecutionEnvironment is up
- */
- s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
- }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index e2b916a..66b9c7f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -46,6 +46,7 @@ import org.junit.{ComparisonFailure, Rule}
import org.mockito.Mockito.{mock, when}
import _root_.scala.util.control.Breaks._
+import scala.io.Source
/**
* Test base for testing Table API / SQL plans.
@@ -214,6 +215,15 @@ object TableTestUtil {
s"DataStreamScan(id=[$id], fields=[${fieldNames.mkString(", ")}])"
}
+
+ def readFromResource(file: String): String = {
+ val source = s"${getClass.getResource("/").getFile}../../src/test/scala/resources/$file"
+ Source.fromFile(source).mkString
+ }
+
+ def replaceStageId(s: String): String = {
+ s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+ }
}
case class BatchTableTestUtil(
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testFromToDataStreamAndSqlUpdate.out b/flink-table/flink-table-planner/src/test/scala/resources/testFromToDataStreamAndSqlUpdate.out
index 9cc6251..76c0638 100644
--- a/flink-table/flink-table-planner/src/test/scala/resources/testFromToDataStreamAndSqlUpdate.out
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testFromToDataStreamAndSqlUpdate.out
@@ -1,21 +1,22 @@
== Abstract Syntax Tree ==
-LogicalProject(first=[$0])
- FlinkLogicalDataStreamScan(fields=[first, id, score, last])
+LogicalSink(name=[default_catalog.default_database.MySink1], fields=[first])
+ LogicalProject(first=[$0])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
== Optimized Logical Plan ==
-DataStreamCalc(select=[first])
- DataStreamScan(fields=[first, id, score, last])
+DataStreamSink(name=[default_catalog.default_database.MySink1], fields=[first])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[first], source=[CsvTableSource(read fields: first)])
== Physical Execution Plan ==
: Data Source
content : collect elements with CollectionInputFormat
: Operator
- content : from: (first, id, score, last)
+ content : CsvTableSource(read fields: first)
ship_strategy : REBALANCE
: Operator
- content : where: (>(id, 0)), select: (last)
+ content : Map
ship_strategy : FORWARD
: Operator
@@ -23,14 +24,10 @@ DataStreamCalc(select=[first])
ship_strategy : FORWARD
: Operator
- content : from: (first, id, score, last)
+ content : Map
ship_strategy : REBALANCE
- : Operator
- content : select: (first)
+ : Data Sink
+ content : Sink: CsvTableSink(first)
ship_strategy : FORWARD
- : Data Sink
- content : Sink: Unnamed
- ship_strategy : FORWARD
-
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testInsert.out b/flink-table/flink-table-planner/src/test/scala/resources/testInsert.out
new file mode 100644
index 0000000..bddbee2
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testInsert.out
@@ -0,0 +1,29 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.targetTable], fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.targetTable], fields=[d, e])
+ StreamTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : CsvTableSource(read fields: first, id)
+ ship_strategy : REBALANCE
+
+ : Operator
+ content : Map
+ ship_strategy : FORWARD
+
+ : Operator
+ content : to: Row
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts.out b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts.out
new file mode 100644
index 0000000..0bb4d0f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts.out
@@ -0,0 +1,55 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.targetTable1], fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+LogicalSink(name=[default_catalog.default_database.targetTable2], fields=[d, e])
+ LogicalProject(last=[$3], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.targetTable1], fields=[d, e])
+ StreamTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)])
+
+DataStreamSink(name=[default_catalog.default_database.targetTable2], fields=[d, e])
+ StreamTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[last, id], source=[CsvTableSource(read fields: last, id)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : CsvTableSource(read fields: first, id)
+ ship_strategy : REBALANCE
+
+ : Operator
+ content : Map
+ ship_strategy : FORWARD
+
+ : Operator
+ content : to: Row
+ ship_strategy : FORWARD
+
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : CsvTableSource(read fields: last, id)
+ ship_strategy : REBALANCE
+
+ : Operator
+ content : Map
+ ship_strategy : FORWARD
+
+ : Operator
+ content : to: Row
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: UnsafeMemoryAppendTableSink(d, e)
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out b/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
index 40fce87..76c0638 100644
--- a/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
@@ -1,9 +1,11 @@
== Abstract Syntax Tree ==
-LogicalProject(first=[$0])
- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+LogicalSink(name=[default_catalog.default_database.MySink1], fields=[first])
+ LogicalProject(first=[$0])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
== Optimized Logical Plan ==
-StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[first], source=[CsvTableSource(read fields: first)])
+DataStreamSink(name=[default_catalog.default_database.MySink1], fields=[first])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[first], source=[CsvTableSource(read fields: first)])
== Physical Execution Plan ==
: Data Source
@@ -17,3 +19,15 @@ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fiel
content : Map
ship_strategy : FORWARD
+ : Operator
+ content : to: Row
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Map
+ ship_strategy : REBALANCE
+
+ : Data Sink
+ content : Sink: CsvTableSink(first)
+ ship_strategy : FORWARD
+
[flink] 04/05: [FLINK-17267] [table] Introduce
TableEnvironment#explainSql api
Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a5fc5f28a1c3569da8de953f6cd1fad5371f6a4
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Apr 29 10:57:32 2020 +0800
[FLINK-17267] [table] Introduce TableEnvironment#explainSql api
---
flink-python/pyflink/table/__init__.py | 4 +-
flink-python/pyflink/table/explain_detail.py | 34 ++++++++++++
flink-python/pyflink/table/table_environment.py | 20 ++++++-
.../table/tests/test_table_environment_api.py | 29 +++++++++-
flink-python/pyflink/util/utils.py | 20 +++++++
.../org/apache/flink/table/api/ExplainDetail.java} | 45 +++++-----------
.../apache/flink/table/api/TableEnvironment.java | 15 +++++-
.../table/api/internal/TableEnvironmentImpl.java | 35 +++++++++++--
.../org/apache/flink/table/delegation/Planner.java | 7 +--
.../org/apache/flink/table/utils/PlannerMock.java | 3 +-
.../table/planner/delegation/BatchPlanner.scala | 8 +--
.../table/planner/delegation/StreamPlanner.scala | 11 ++--
.../resources/explain/testExplainSqlWithInsert.out | 31 +++++++++++
.../resources/explain/testExplainSqlWithSelect.out | 21 ++++++++
.../flink/table/api/TableEnvironmentTest.scala | 57 ++++++++++++++++++++
.../flink/table/planner/utils/TableTestBase.scala | 2 +-
.../table/api/internal/BatchTableEnvImpl.scala | 20 +++++--
.../flink/table/api/internal/TableEnvImpl.scala | 18 +++++--
.../apache/flink/table/planner/StreamPlanner.scala | 2 +-
.../api/batch/BatchTableEnvironmentTest.scala | 61 +++++++++++++++++++++-
.../api/stream/StreamTableEnvironmentTest.scala | 58 ++++++++++++++++++++
.../flink/table/utils/MockTableEnvironment.scala | 4 +-
.../scala/resources/testExplainSqlWithInsert0.out | 31 +++++++++++
.../scala/resources/testExplainSqlWithInsert1.out | 43 +++++++++++++++
.../scala/resources/testExplainSqlWithSelect0.out | 21 ++++++++
.../scala/resources/testExplainSqlWithSelect1.out | 27 ++++++++++
26 files changed, 562 insertions(+), 65 deletions(-)
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 140c6b3..1e367f3 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -70,6 +70,7 @@ from pyflink.table.sources import TableSource, CsvTableSource
from pyflink.table.types import DataTypes, UserDefinedType, Row
from pyflink.table.table_schema import TableSchema
from pyflink.table.udf import FunctionContext, ScalarFunction
+from pyflink.table.explain_detail import ExplainDetail
__all__ = [
'TableEnvironment',
@@ -93,5 +94,6 @@ __all__ = [
'TableSchema',
'FunctionContext',
'ScalarFunction',
- 'SqlDialect'
+ 'SqlDialect',
+ 'ExplainDetail'
]
diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py
new file mode 100644
index 0000000..48e7ce9
--- /dev/null
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -0,0 +1,34 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+__all__ = ['ExplainDetail']
+
+
+class ExplainDetail(object):
+ """
+ ExplainDetail defines the types of details for explain result.
+ """
+
+ # The cost information on physical rel node estimated by optimizer.
+ # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network,
+ # 0.0 memory}
+ ESTIMATED_COST = 0
+
+ # The changelog traits produced by a physical rel node.
+ # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+ CHANGELOG_TRAITS = 1
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index d8e8c51..94ff785 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -36,7 +36,8 @@ from pyflink.table import Table
from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
_infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema
from pyflink.util import utils
-from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class
+from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class, \
+ to_j_explain_detail_arr
__all__ = [
'BatchTableEnvironment',
@@ -468,6 +469,23 @@ class TableEnvironment(object):
else:
return self._j_tenv.explain(table._j_table, extended)
+ def explain_sql(self, stmt, *extra_details):
+ """
+ Returns the AST of the specified statement and the execution plan to compute
+ the result of the given statement.
+
+ :param stmt: The statement for which the AST and execution plan will be returned.
+ :type stmt: str
+ :param extra_details: The extra explain details which the explain result should include,
+ e.g. estimated cost, change log trait for streaming
+ :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
+ :return: The statement for which the AST and execution plan will be returned.
+ :rtype: str
+ """
+
+ j_extra_details = to_j_explain_detail_arr(extra_details)
+ return self._j_tenv.explainSql(stmt, j_extra_details)
+
def sql_query(self, query):
"""
Evaluates a SQL query on registered tables and retrieves the result as a
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 87c8023..bd279af 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -35,8 +35,8 @@ from pyflink.table.types import RowType
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase, \
PyFlinkBlinkBatchTableTestCase
-from pyflink.util.exceptions import TableException
from pyflink.util.utils import get_j_env_configuration
+from pyflink.table.explain_detail import ExplainDetail
class TableEnvironmentTest(object):
@@ -242,6 +242,33 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
actual = t_env.explain(extended=True)
assert isinstance(actual, str)
+ def test_explain_sql_without_explain_detail(self):
+ t_env = self.t_env
+ source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+ t_env.register_table_sink(
+ "sinks",
+ source_sink_utils.TestAppendSink(field_names, field_types))
+
+ result = t_env.explain_sql("select a + 1, b, c from %s" % source)
+
+ assert isinstance(result, str)
+
+ def test_explain_sql_with_explain_detail(self):
+ t_env = self.t_env
+ source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
+ field_names = ["a", "b", "c"]
+ field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
+ t_env.register_table_sink(
+ "sinks",
+ source_sink_utils.TestAppendSink(field_names, field_types))
+
+ result = t_env.explain_sql(
+ "select a + 1, b, c from %s" % source, ExplainDetail.ESTIMATED_COST)
+
+ assert isinstance(result, str)
+
def test_create_table_environment(self):
table_config = TableConfig()
table_config.set_max_generated_code_length(32000)
diff --git a/flink-python/pyflink/util/utils.py b/flink-python/pyflink/util/utils.py
index 89db742..29a20da 100644
--- a/flink-python/pyflink/util/utils.py
+++ b/flink-python/pyflink/util/utils.py
@@ -125,3 +125,23 @@ def add_jars_to_context_class_loader(jar_urls):
new_classloader = gateway.jvm.java.net.URLClassLoader(
to_jarray(gateway.jvm.java.net.URL, j_urls), context_classloader)
gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
+
+
+def to_j_explain_detail_arr(p_extra_details):
+ # sphinx will check "import loop" when generating doc,
+ # use local import to avoid above error
+ from pyflink.table.explain_detail import ExplainDetail
+ gateway = get_gateway()
+
+ def to_j_explain_detail(p_extra_detail):
+ if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS:
+ return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS
+ else:
+ return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
+
+ _len = len(p_extra_details) if p_extra_details else 0
+ j_arr = gateway.new_array(gateway.jvm.org.apache.flink.table.api.ExplainDetail, _len)
+ for i in range(0, _len):
+ j_arr[i] = to_j_explain_detail(p_extra_details[i])
+
+ return j_arr
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
similarity index 50%
copy from flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 92f50ff..6e9d014 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -16,38 +16,21 @@
* limitations under the License.
*/
-package org.apache.flink.table.utils;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.operations.ModifyOperation;
-import org.apache.flink.table.operations.Operation;
-
-import java.util.List;
+package org.apache.flink.table.api;
/**
- * Mocking {@link Planner} for tests.
+ * ExplainDetail defines the types of details for explain result.
*/
-public class PlannerMock implements Planner {
-
- @Override
- public Parser getParser() {
- return new ParserMock();
- }
-
- @Override
- public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
- return null;
- }
-
- @Override
- public String explain(List<Operation> operations, boolean extended) {
- return null;
- }
-
- @Override
- public String[] getCompletionHints(String statement, int position) {
- return new String[0];
- }
+public enum ExplainDetail {
+ /**
+ * The cost information on physical rel node estimated by optimizer.
+ * e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
+ */
+ ESTIMATED_COST,
+
+ /**
+ * The changelog traits produced by a physical rel node.
+ * e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+ */
+ CHANGELOG_TRAITS
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index d855b21..12d21ec 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -828,7 +828,7 @@ public interface TableEnvironment {
* the result of the given {@link Table}.
*
* @param table The table for which the AST and execution plan will be returned.
- * @param extended if the plan should contain additional properties such as
+ * @param extended if the plan should contain additional properties,
* e.g. estimated cost, traits
*/
String explain(Table table, boolean extended);
@@ -837,12 +837,23 @@ public interface TableEnvironment {
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of multiple-sinks plan.
*
- * @param extended if the plan should contain additional properties such as
+ * @param extended if the plan should contain additional properties,
* e.g. estimated cost, traits
*/
String explain(boolean extended);
/**
+ * Returns the AST of the specified statement and the execution plan to compute
+ * the result of the given statement.
+ *
+ * @param statement The statement for which the AST and execution plan will be returned.
+ * @param extraDetails The extra explain details which the explain result should include,
+ * e.g. estimated cost, change log trait for streaming
+ * @return AST and the execution plan.
+ */
+ String explainSql(String statement, ExplainDetail... extraDetails);
+
+ /**
* Returns completion hints for the given statement at the given cursor position.
* The completion happens case insensitively.
*
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1ca045b..610627c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.Table;
@@ -136,6 +137,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected final FunctionCatalog functionCatalog;
protected final Planner planner;
protected final Parser parser;
+ private final boolean isStreamingMode;
private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
@@ -179,6 +181,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
this.functionCatalog = functionCatalog;
this.planner = planner;
this.parser = planner.getParser();
+ this.isStreamingMode = isStreamingMode;
this.operationTreeBuilder = OperationTreeBuilder.create(
tableConfig,
functionCatalog.asLookup(parser::parseIdentifier),
@@ -589,14 +592,25 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
@Override
public String explain(Table table, boolean extended) {
- return planner.explain(Collections.singletonList(table.getQueryOperation()), extended);
+ return planner.explain(Collections.singletonList(table.getQueryOperation()), getExplainDetails(extended));
}
@Override
public String explain(boolean extended) {
List<Operation> operations = bufferedModifyOperations.stream()
- .map(o -> (Operation) o).collect(Collectors.toList());
- return planner.explain(operations, extended);
+ .map(o -> (Operation) o).collect(Collectors.toList());
+ return planner.explain(operations, getExplainDetails(extended));
+ }
+
+ @Override
+ public String explainSql(String statement, ExplainDetail... extraDetails) {
+ List<Operation> operations = parser.parse(statement);
+
+ if (operations.size() != 1) {
+ throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
+ }
+
+ return planner.explain(operations, extraDetails);
}
@Override
@@ -854,7 +868,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
} else if (operation instanceof ShowViewsOperation) {
return buildShowResult(listViews());
} else if (operation instanceof ExplainOperation) {
- String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false);
+ String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()));
return TableResultImpl.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
@@ -979,6 +993,19 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
bufferedModifyOperations.addAll(modifyOperations);
}
+ @VisibleForTesting
+ protected ExplainDetail[] getExplainDetails(boolean extended) {
+ if (extended) {
+ if (isStreamingMode) {
+ return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS };
+ } else {
+ return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST };
+ }
+ } else {
+ return new ExplainDetail[0];
+ }
+ }
+
private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
validateTableSource(tableSource);
ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(name));
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index 5bb9266..d926e3a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.delegation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
@@ -79,10 +80,10 @@ public interface Planner {
*
* @param operations The collection of relational queries for which the AST
* and execution plan will be returned.
- * @param extended if the plan should contain additional properties such as
- * e.g. estimated cost, traits
+ * @param extraDetails The extra explain details which the explain result should include,
+ * e.g. estimated cost, change log trait for streaming
*/
- String explain(List<Operation> operations, boolean extended);
+ String explain(List<Operation> operations, ExplainDetail... extraDetails);
/**
* Returns completion hints for the given statement at the given cursor position.
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index 92f50ff..42b5403 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.utils;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.operations.ModifyOperation;
@@ -42,7 +43,7 @@ public class PlannerMock implements Planner {
}
@Override
- public String explain(List<Operation> operations, boolean extended) {
+ public String explain(List<Operation> operations, ExplainDetail... extraDetails) {
return null;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 9161753..f97e015 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.delegation
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.delegation.Executor
import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
@@ -78,7 +78,7 @@ class BatchPlanner(
}
}
- override def explain(operations: util.List[Operation], extended: Boolean): String = {
+ override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
require(operations.nonEmpty, "operations should not be empty")
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
@@ -122,10 +122,10 @@ class BatchPlanner(
sb.append("== Optimized Logical Plan ==")
sb.append(System.lineSeparator)
- val explainLevel = if (extended) {
+ val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
SqlExplainLevel.ALL_ATTRIBUTES
} else {
- SqlExplainLevel.DIGEST_ATTRIBUTES
+ SqlExplainLevel.EXPPLAN_ATTRIBUTES
}
sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel))
sb.append(System.lineSeparator)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 7006533..959de06 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.delegation
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.delegation.Executor
import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
@@ -69,7 +69,7 @@ class StreamPlanner(
}
}
- override def explain(operations: util.List[Operation], extended: Boolean): String = {
+ override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
require(operations.nonEmpty, "operations should not be empty")
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
@@ -109,11 +109,12 @@ class StreamPlanner(
sb.append("== Optimized Logical Plan ==")
sb.append(System.lineSeparator)
- val (explainLevel, withChangelogTraits) = if (extended) {
- (SqlExplainLevel.ALL_ATTRIBUTES, true)
+ val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
+ SqlExplainLevel.ALL_ATTRIBUTES
} else {
- (SqlExplainLevel.DIGEST_ATTRIBUTES, false)
+ SqlExplainLevel.DIGEST_ATTRIBUTES
}
+ val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS)
sb.append(ExecNodePlanDumper.dagToString(
execNodes,
explainLevel,
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
new file mode 100644
index 0000000..870269f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : Source: Custom Source
+
+ : Operator
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select=[a, b], where=[(a > 10)])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: Unnamed
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
new file mode 100644
index 0000000..0c87ae3
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : Source: Custom Source
+
+ : Operator
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select=[a, b, c], where=[(a > 10)])
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index a27b47a..0e197ba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -894,6 +894,63 @@ class TableEnvironmentTest {
testUnsupportedExplain("explain plan as json for select * from MyTable")
}
+ @Test
+ def testExplainSqlWithSelect(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = tableEnv.explainSql(
+ "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS)
+ val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
+ @Test
+ def testExplainSqlWithInsert(): Unit = {
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val actual = tableEnv.explainSql(
+ "insert into MySink select a, b from MyTable where a > 10")
+ val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithInsert.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def testUnsupportedExplain(explain: String): Unit = {
try {
tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 4cd0f5d..2d50f43 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1091,7 +1091,7 @@ class TestingTableEnvironment private(
}
override def explain(extended: Boolean): String = {
- planner.explain(bufferedOperations.toList, extended)
+ planner.explain(bufferedOperations.toList, getExplainDetails(extended): _*)
}
@throws[Exception]
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index efc38a5..b3caf20 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -217,16 +217,20 @@ abstract class BatchTableEnvImpl(
* @param extended Flag to include detailed optimizer estimates.
*/
private[flink] def explain(table: Table, extended: Boolean): String = {
- explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended)
+ explain(
+ JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
+ getExplainDetails(extended): _*)
}
override def explain(table: Table): String = explain(table: Table, extended = false)
override def explain(extended: Boolean): String = {
- explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
+ explain(
+ bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava,
+ getExplainDetails(extended): _*)
}
- protected def explain(operations: JList[Operation], extended: Boolean): String = {
+ protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String = {
require(operations.asScala.nonEmpty, "operations should not be empty")
val astList = operations.asScala.map {
case queryOperation: QueryOperation =>
@@ -285,6 +289,8 @@ abstract class BatchTableEnvImpl(
val env = dataSinks.head.getDataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
+ // keep the behavior as before
+ val extended = extraDetails.contains(ExplainDetail.ESTIMATED_COST)
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
s"== Abstract Syntax Tree ==" +
@@ -597,6 +603,14 @@ abstract class BatchTableEnvImpl(
TableSchema.builder().fields(originalNames, fieldTypes).build()
}
+ private def getExplainDetails(extended: Boolean): Array[ExplainDetail] = {
+ if (extended) {
+ Array(ExplainDetail.ESTIMATED_COST)
+ } else {
+ Array.empty
+ }
+ }
+
protected def createDummyBatchTableEnv(): BatchTableEnvImpl
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 7c6f144..1f01186 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -762,14 +762,13 @@ abstract class TableEnvImpl(
case _: ShowViewsOperation =>
buildShowResult(listViews())
case explainOperation: ExplainOperation =>
- val explanation = explain(
- JCollections.singletonList(explainOperation.getChild),
- extended = false)
+ val explanation = explain(JCollections.singletonList(explainOperation.getChild))
TableResultImpl.builder.
resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
.data(JCollections.singletonList(Row.of(explanation)))
.build
+
case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
}
}
@@ -1135,7 +1134,18 @@ abstract class TableEnvImpl(
}
}
- protected def explain(operations: JList[Operation], extended: Boolean): String
+ override def explainSql(statement: String, extraDetails: ExplainDetail*): String = {
+ val operations = parser.parse(statement)
+
+ if (operations.size != 1) {
+ throw new TableException(
+ "Unsupported SQL query! explainSql() only accepts a single SQL query.")
+ }
+
+ explain(operations, extraDetails: _*)
+ }
+
+ protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String
override def fromValues(values: Expression*): Table = {
createTable(operationTreeBuilder.values(values: _*))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 756d9ca..d81ca1c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -120,7 +120,7 @@ class StreamPlanner(
}.filter(Objects.nonNull).asJava
}
- override def explain(operations: util.List[Operation], extended: Boolean): String = {
+ override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
require(operations.asScala.nonEmpty, "operations should not be empty")
val astWithUpdatesAsRetractionTuples = operations.asScala.map {
case queryOperation: QueryOperation =>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index d09bd5d..c928314 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -24,8 +24,7 @@ import org.apache.flink.table.api.{ResultKind, TableException}
import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF}
import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId, _}
import org.apache.flink.types.Row
import org.hamcrest.Matchers.containsString
@@ -447,6 +446,64 @@ class BatchTableEnvironmentTest extends TableTestBase {
"explain plan as json for select * from MyTable")
}
+ @Test
+ def testExplainSqlWithSelect(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+ val expected = readFromResource("testExplainSqlWithSelect1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
+ @Test
+ def testExplainSqlWithInsert(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val actual = util.tableEnv.explainSql(
+ "insert into MySink select a, b from MyTable where a > 10")
+ val expected = readFromResource("testExplainSqlWithInsert1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
try {
tableEnv.executeSql(explain)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 439fadb..25bb536 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -326,6 +326,64 @@ class StreamTableEnvironmentTest extends TableTestBase {
}
}
+ @Test
+ def testExplainSqlWithSelect(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+ val expected = readFromResource("testExplainSqlWithSelect0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
+ @Test
+ def testExplainSqlWithInsert(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val actual = util.tableEnv.explainSql(
+ "insert into MySink select a, b from MyTable where a > 10")
+ val expected = readFromResource("testExplainSqlWithInsert0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ }
+
private def prepareSchemaExpressionParser:
(JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 8a9d9c4..312d980 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.utils
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableResult}
+import org.apache.flink.table.api.{ExplainDetail, Table, TableConfig, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.Catalog
import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.expressions.Expression
@@ -74,6 +74,8 @@ class MockTableEnvironment extends TableEnvironment {
override def explain(extended: Boolean): String = ???
+ override def explainSql(statement: String, extraDetails: ExplainDetail*): String = ???
+
override def getCompletionHints(statement: String, position: Int): Array[String] = ???
override def sqlQuery(query: String): Table = ???
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
new file mode 100644
index 0000000..bbd0f53
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+ LogicalProject(a=[$0], b=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+ DataStreamCalc(select=[a, b], where=[>(a, 10)])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : from: (a, b)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : where: (>(a, 10)), select: (a, b)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : to: Row
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: Unnamed
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
new file mode 100644
index 0000000..b904056
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
@@ -0,0 +1,43 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+ LogicalProject(a=[$0], b=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+ DataSetCalc(select=[a, b], where=[>(a, 10)])
+ BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : from: (a, b)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : FlatMap
+ content : where: (>(a, 10)), select: (a, b)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : FlatMap
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : to: Row
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : org.apache.flink.api.java.io.LocalCollectionOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : Map
+ ship_strategy : FORWARD
+
+ : Operator
+ content : where: (>(a, 10)), select: (a, b, c)
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+ BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : FlatMap
+ content : where: (>(a, 10)), select: (a, b, c)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : FlatMap
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : org.apache.flink.api.java.io.DiscardingOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+