You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/08 12:57:30 UTC
[flink] 03/05: [FLINK-17267] [table] TableEnvironment#explainSql
supports EXPLAIN statement
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6013a62523bd2fc73b6aa7b3109f6b7c138ac51c
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Apr 22 17:08:00 2020 +0800
[FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement
---
.../table/api/internal/TableEnvironmentImpl.java | 9 ++
.../flink/table/operations/ExplainOperation.java | 46 ++++++++
.../operations/SqlToOperationConverter.java | 19 ++++
.../table/planner/calcite/FlinkPlannerImpl.scala | 11 +-
.../table/planner/delegation/BatchPlanner.scala | 22 +++-
.../table/planner/delegation/StreamPlanner.scala | 22 +++-
.../explain/testExecuteSqlWithExplainInsert.out | 31 +++++
.../explain/testExecuteSqlWithExplainSelect.out | 21 ++++
.../flink/table/api/TableEnvironmentTest.scala | 115 ++++++++++++++++++-
.../table/sqlexec/SqlToOperationConverter.java | 19 ++++
.../table/api/internal/BatchTableEnvImpl.scala | 71 ++++++++----
.../flink/table/api/internal/TableEnvImpl.scala | 13 ++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 11 +-
.../apache/flink/table/planner/StreamPlanner.scala | 17 ++-
.../flink/table/api/TableEnvironmentITCase.scala | 13 +--
.../api/batch/BatchTableEnvironmentTest.scala | 120 +++++++++++++++++++-
.../api/stream/StreamTableEnvironmentTest.scala | 126 ++++++++++++++++++++-
.../resources/testExecuteSqlWithExplainInsert0.out | 31 +++++
.../resources/testExecuteSqlWithExplainInsert1.out | 36 ++++++
.../resources/testExecuteSqlWithExplainSelect0.out | 21 ++++
.../resources/testExecuteSqlWithExplainSelect1.out | 27 +++++
21 files changed, 744 insertions(+), 57 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 342ee55..1ca045b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -72,6 +72,7 @@ import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogQueryOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
@@ -852,6 +853,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
return buildShowResult(listFunctions());
} else if (operation instanceof ShowViewsOperation) {
return buildShowResult(listViews());
+ } else if (operation instanceof ExplainOperation) {
+ String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false);
+ return TableResultImpl.builder()
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
+ .data(Collections.singletonList(Row.of(explanation)))
+ .build();
+
} else {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
new file mode 100644
index 0000000..c78c78f
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import java.util.Collections;
+
+/**
+ * Operation to describe an EXPLAIN statement.
+ * NOTES: currently, only default behavior(EXPLAIN PLAN FOR xx) is supported.
+ */
+public class ExplainOperation implements Operation {
+ private final Operation child;
+
+ public ExplainOperation(Operation child) {
+ this.child = child;
+ }
+
+ public Operation getChild() {
+ return child;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return OperationUtils.formatWithChildren(
+ "EXPLAIN PLAN FOR",
+ Collections.emptyMap(),
+ Collections.singletonList(child),
+ Operation::asSummaryString);
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index b303570..9f80aa1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -62,6 +62,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
import org.apache.flink.table.operations.ShowDatabasesOperation;
@@ -98,6 +99,9 @@ import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
@@ -195,6 +199,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertDropView((SqlDropView) validated));
} else if (validated instanceof SqlShowViews) {
return Optional.of(converter.convertShowViews((SqlShowViews) validated));
+ } else if (validated instanceof SqlExplain) {
+ return Optional.of(converter.convertExplain((SqlExplain) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
@@ -577,6 +583,19 @@ public class SqlToOperationConverter {
return new ShowViewsOperation();
}
+ /** Convert EXPLAIN statement. */
+ private Operation convertExplain(SqlExplain sqlExplain) {
+ Operation operation = convertSqlQuery(sqlExplain.getExplicandum());
+
+ if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES ||
+ sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL ||
+ sqlExplain.getFormat() != SqlExplainFormat.TEXT) {
+ throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx");
+ }
+
+ return new ExplainOperation(operation);
+ }
+
/** Fallback method for sql query. */
private Operation convertSqlQuery(SqlNode node) {
return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index c3f72b3..846f536 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -30,7 +30,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
-import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
import java.lang.{Boolean => JBoolean}
@@ -129,7 +129,14 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowViews]) {
return sqlNode
}
- validator.validate(sqlNode)
+ sqlNode match {
+ case explain: SqlExplain =>
+ val validated = validator.validate(explain.getExplicandum)
+ explain.setOperand(0, validated)
+ explain
+ case _ =>
+ validator.validate(sqlNode)
+ }
}
catch {
case e: RuntimeException =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 2626809..9161753 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.planner.operations.PlannerQueryOperation
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
@@ -32,6 +33,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp
import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
+import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
import org.apache.calcite.sql.SqlExplainLevel
@@ -80,7 +82,21 @@ class BatchPlanner(
require(operations.nonEmpty, "operations should not be empty")
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
- getRelBuilder.queryOperation(queryOperation).build()
+ val relNode = getRelBuilder.queryOperation(queryOperation).build()
+ relNode match {
+ // SQL: explain plan for insert into xx
+ case modify: LogicalTableModify =>
+ // convert LogicalTableModify to CatalogSinkModifyOperation
+ val qualifiedName = modify.getTable.getQualifiedName
+ require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+ val modifyOperation = new CatalogSinkModifyOperation(
+ ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+ new PlannerQueryOperation(modify.getInput)
+ )
+ translateToRel(modifyOperation)
+ case _ =>
+ relNode
+ }
case modifyOperation: ModifyOperation =>
translateToRel(modifyOperation)
case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index e6245f2..7006533 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.delegation.Executor
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.planner.operations.PlannerQueryOperation
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer}
@@ -30,6 +31,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp
import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil}
import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
+import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.sql.SqlExplainLevel
import java.util
@@ -71,7 +73,21 @@ class StreamPlanner(
require(operations.nonEmpty, "operations should not be empty")
val sinkRelNodes = operations.map {
case queryOperation: QueryOperation =>
- getRelBuilder.queryOperation(queryOperation).build()
+ val relNode = getRelBuilder.queryOperation(queryOperation).build()
+ relNode match {
+ // SQL: explain plan for insert into xx
+ case modify: LogicalTableModify =>
+ // convert LogicalTableModify to CatalogSinkModifyOperation
+ val qualifiedName = modify.getTable.getQualifiedName
+ require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+ val modifyOperation = new CatalogSinkModifyOperation(
+ ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+ new PlannerQueryOperation(modify.getInput)
+ )
+ translateToRel(modifyOperation)
+ case _ =>
+ relNode
+ }
case modifyOperation: ModifyOperation =>
translateToRel(modifyOperation)
case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
new file mode 100644
index 0000000..0e5e015
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : Source: Custom Source
+
+ : Operator
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select=[a, b], where=[(a > 10)])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : SinkConversionToRow
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: Unnamed
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
new file mode 100644
index 0000000..4865193
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : Source: Custom Source
+
+ : Operator
+ content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])
+ ship_strategy : FORWARD
+
+ : Operator
+ content : Calc(select=[a, b, c], where=[(a > 10)])
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index e289edb..a27b47a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -18,8 +18,6 @@
package org.apache.flink.table.api
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.sql.SqlExplainLevel
import org.apache.flink.api.common.typeinfo.Types.STRING
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
@@ -29,10 +27,14 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.planner.operations.SqlConversionException
import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF
import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction
+import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks}
import org.apache.flink.types.Row
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.sql.SqlExplainLevel
import org.hamcrest.Matchers.containsString
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.rules.ExpectedException
import org.junit.{Rule, Test}
@@ -797,6 +799,113 @@ class TableEnvironmentTest {
tableResult5.collect())
}
+ @Test
+ def testExecuteSqlWithExplainSelect(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val tableResult2 = tableEnv.executeSql("explain plan for select * from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ val it = tableResult2.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithExplainInsert(): Unit = {
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val tableResult3 = tableEnv.executeSql(
+ "explain plan for insert into MySink select a, b from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+ val it = tableResult3.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithUnsupportedExplain(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ // TODO we can support them later
+ testUnsupportedExplain("explain plan excluding attributes for select * from MyTable")
+ testUnsupportedExplain("explain plan including all attributes for select * from MyTable")
+ testUnsupportedExplain("explain plan with type for select * from MyTable")
+ testUnsupportedExplain("explain plan without implementation for select * from MyTable")
+ testUnsupportedExplain("explain plan as xml for select * from MyTable")
+ testUnsupportedExplain("explain plan as json for select * from MyTable")
+ }
+
+ private def testUnsupportedExplain(explain: String): Unit = {
+ try {
+ tableEnv.executeSql(explain)
+ fail("This should not happen")
+ } catch {
+ case e: TableException =>
+ assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+ case e =>
+ fail("This should not happen, " + e.getMessage)
+ }
+ }
+
private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
while (expected.hasNext && actual.hasNext) {
assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 8ebb47e..ded0d85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -61,6 +61,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.PlannerQueryOperation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -91,6 +92,9 @@ import org.apache.flink.util.StringUtils;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
@@ -187,6 +191,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertDropView((SqlDropView) validated));
} else if (validated instanceof SqlShowViews) {
return Optional.of(converter.convertShowViews((SqlShowViews) validated));
+ } else if (validated instanceof SqlExplain) {
+ return Optional.of(converter.convertExplain((SqlExplain) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
@@ -534,6 +540,19 @@ public class SqlToOperationConverter {
return new ShowViewsOperation();
}
+ /** Convert EXPLAIN statement. */
+ private Operation convertExplain(SqlExplain sqlExplain) {
+ Operation operation = convertSqlQuery(sqlExplain.getExplicandum());
+
+ if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES ||
+ sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL ||
+ sqlExplain.getFormat() != SqlExplainFormat.TEXT) {
+ throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx");
+ }
+
+ return new ExplainOperation(operation);
+ }
+
/**
* Create a table schema from {@link SqlCreateTable}. This schema contains computed column
* fields, say, we have a create table DDL statement:
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index e25b8e4..efc38a5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -31,14 +31,14 @@ import org.apache.flink.configuration.DeploymentOptions
import org.apache.flink.core.execution.{DetachedJobExecutionResult, JobClient}
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory}
-import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager}
+import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager, ObjectIdentifier}
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation}
+import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, PlannerQueryOperation, QueryOperation}
import org.apache.flink.table.plan.BatchOptimizer
import org.apache.flink.table.plan.nodes.LogicalSink
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
@@ -57,6 +57,7 @@ import org.apache.flink.util.Preconditions.checkNotNull
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalTableModify
import _root_.java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}
@@ -225,11 +226,25 @@ abstract class BatchTableEnvImpl(
explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended)
}
- private def explain(operations: JList[Operation], extended: Boolean): String = {
+ protected def explain(operations: JList[Operation], extended: Boolean): String = {
require(operations.asScala.nonEmpty, "operations should not be empty")
val astList = operations.asScala.map {
case queryOperation: QueryOperation =>
- getRelBuilder.tableOperation(queryOperation).build()
+ val relNode = getRelBuilder.tableOperation(queryOperation).build()
+ relNode match {
+ // SQL: explain plan for insert into xx
+ case modify: LogicalTableModify =>
+ // convert LogicalTableModify to CatalogSinkModifyOperation
+ val qualifiedName = modify.getTable.getQualifiedName
+ require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+ val modifyOperation = new CatalogSinkModifyOperation(
+ ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+ new PlannerQueryOperation(modify.getInput)
+ )
+ translateToRel(modifyOperation, addLogicalSink = true)
+ case _ =>
+ relNode
+ }
case modifyOperation: ModifyOperation =>
translateToRel(modifyOperation, addLogicalSink = true)
case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}")
@@ -237,27 +252,33 @@ abstract class BatchTableEnvImpl(
val optimizedNodes = astList.map(optimizer.optimize)
- val batchTableEnv = createDummyBatchTableEnv()
- val dataSinks = optimizedNodes.zip(operations.asScala).map {
- case (optimizedNode, operation) =>
- operation match {
- case queryOperation: QueryOperation =>
- val dataSet = translate[Row](
- optimizedNode,
- getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))(
- new GenericTypeInfo(classOf[Row]))
- dataSet.output(new DiscardingOutputFormat[Row])
- case modifyOperation: ModifyOperation =>
- val tableSink = getTableSink(modifyOperation)
- translate(
- batchTableEnv,
- optimizedNode,
- tableSink,
- getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
- case o =>
- throw new TableException("Unsupported Operation: " + o.asSummaryString())
- }
- }
+ val batchTableEnv = createDummyBatchTableEnv()
+ val dataSinks = optimizedNodes.zip(operations.asScala).map {
+ case (optimizedNode, operation) =>
+ operation match {
+ case queryOperation: QueryOperation =>
+ val fieldNames = queryOperation match {
+ case o: PlannerQueryOperation if o.getCalciteTree.isInstanceOf[LogicalTableModify] =>
+ o.getCalciteTree.getInput(0).getRowType.getFieldNames.asScala.toArray[String]
+ case _ =>
+ queryOperation.getTableSchema.getFieldNames
+ }
+ val dataSet = translate[Row](
+ optimizedNode,
+ getTableSchema(fieldNames, optimizedNode))(
+ new GenericTypeInfo(classOf[Row]))
+ dataSet.output(new DiscardingOutputFormat[Row])
+ case modifyOperation: ModifyOperation =>
+ val tableSink = getTableSink(modifyOperation)
+ translate(
+ batchTableEnv,
+ optimizedNode,
+ tableSink,
+ getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode))
+ case o =>
+ throw new TableException("Unsupported Operation: " + o.asSummaryString())
+ }
+ }
val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator)
val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3a97106..7c6f144 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -545,8 +545,6 @@ abstract class TableEnvImpl(
override def listFunctions(): Array[String] = functionCatalog.getFunctions
- override def explain(table: Table): String
-
override def getCompletionHints(statement: String, position: Int): Array[String] = {
val planner = getFlinkPlanner
planner.getCompletionHints(statement, position)
@@ -763,6 +761,15 @@ abstract class TableEnvImpl(
TableResultImpl.TABLE_RESULT_OK
case _: ShowViewsOperation =>
buildShowResult(listViews())
+ case explainOperation: ExplainOperation =>
+ val explanation = explain(
+ JCollections.singletonList(explainOperation.getChild),
+ extended = false)
+ TableResultImpl.builder.
+ resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
+ .data(JCollections.singletonList(Row.of(explanation)))
+ .build
case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
}
}
@@ -1128,6 +1135,8 @@ abstract class TableEnvImpl(
}
}
+ protected def explain(operations: JList[Operation], extended: Boolean): String
+
override def fromValues(values: Expression*): Table = {
createTable(operationTreeBuilder.values(values: _*))
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index b4c6210..3d7dae4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.rel.RelRoot
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
-import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
import _root_.java.lang.{Boolean => JBoolean}
@@ -127,7 +127,14 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowViews]) {
return sqlNode
}
- validator.validate(sqlNode)
+ sqlNode match {
+ case explain: SqlExplain =>
+ val validated = validator.validate(explain.getExplicandum)
+ explain.setOperand(0, validated)
+ explain
+ case _ =>
+ validator.validate(sqlNode)
+ }
}
catch {
case e: RuntimeException =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index f549168..756d9ca 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -43,6 +43,7 @@ import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalTableModify
import _root_.java.util
import _root_.java.util.Objects
@@ -123,7 +124,21 @@ class StreamPlanner(
require(operations.asScala.nonEmpty, "operations should not be empty")
val astWithUpdatesAsRetractionTuples = operations.asScala.map {
case queryOperation: QueryOperation =>
- (getRelBuilder.tableOperation(queryOperation).build(), false)
+ val relNode = getRelBuilder.tableOperation(queryOperation).build()
+ relNode match {
+ // SQL: explain plan for insert into xx
+ case modify: LogicalTableModify =>
+ // convert LogicalTableModify to CatalogSinkModifyOperation
+ val qualifiedName = modify.getTable.getQualifiedName
+ require(qualifiedName.size() == 3, "the length of qualified name should be 3.")
+ val modifyOperation = new CatalogSinkModifyOperation(
+ ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)),
+ new PlannerQueryOperation(modify.getInput)
+ )
+ translateToRel(modifyOperation)
+ case _ =>
+ (relNode, false)
+ }
case modifyOperation: ModifyOperation =>
translateToRel(modifyOperation)
case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index c5e90bf..dd83a3e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -24,13 +24,14 @@ import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecutionEnvironment}
-import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData, readFromResource, replaceStageId}
+import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData}
import org.apache.flink.table.api.internal.TableEnvironmentImpl
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnvironment, _}
import org.apache.flink.table.runtime.utils.StreamITCase
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
import org.apache.flink.table.utils.TestingOverwritableTableSink
import org.apache.flink.types.Row
import org.apache.flink.util.FileUtils
@@ -46,7 +47,6 @@ import _root_.java.lang.{Long => JLong}
import _root_.java.util
import _root_.scala.collection.mutable
-import _root_.scala.io.Source
@RunWith(classOf[Parameterized])
@@ -452,15 +452,6 @@ object TableEnvironmentITCase {
)
}
- def readFromResource(file: String): String = {
- val source = s"${getClass.getResource("/").getFile}../../src/test/scala/resources/$file"
- Source.fromFile(source).mkString
- }
-
- def replaceStageId(s: String): String = {
- s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
- }
-
def getPersonCsvTableSource: CsvTableSource = {
val header = "First#Id#Score#Last"
val csvRecords = getPersonData.map {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index 9823e08..d09bd5d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -25,10 +25,11 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF}
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId}
import org.apache.flink.types.Row
import org.hamcrest.Matchers.containsString
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.Test
import java.util
@@ -341,6 +342,123 @@ class BatchTableEnvironmentTest extends TableTestBase {
tableResult4.collect())
}
+ @Test
+ def testExecuteSqlWithExplainSelect(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val tableResult2 = util.tableEnv.executeSql(
+ "explain plan for select * from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ val it = tableResult2.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = readFromResource("testExecuteSqlWithExplainSelect1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithExplainInsert(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val tableResult3 = util.tableEnv.executeSql(
+ "explain plan for insert into MySink select a, b from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+ val it = tableResult3.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = readFromResource("testExecuteSqlWithExplainInsert1.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithUnsupportedExplain(): Unit = {
+ val util = batchTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ // TODO we can support them later
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan excluding attributes for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan including all attributes for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan with type for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan without implementation for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan as xml for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan as json for select * from MyTable")
+ }
+
+ private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = {
+ try {
+ tableEnv.executeSql(explain)
+ fail("This should not happen")
+ } catch {
+ case e: TableException =>
+ assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+ case e =>
+ fail("This should not happen, " + e.getMessage)
+ }
+ }
+
private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
while (expected.hasNext && actual.hasNext) {
assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 809d0aa..439fadb 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -29,20 +29,21 @@ import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => JStreamTableEnvironmentImpl}
import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{EnvironmentSettings, Expressions, TableConfig, Types, ValidationException}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.api.{EnvironmentSettings, ResultKind, TableConfig, TableException, Types, ValidationException}
+import org.apache.flink.table.catalog.FunctionCatalog
import org.apache.flink.table.executor.StreamExecutor
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.planner.StreamPlanner
import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.TableTestUtil.{binaryNode, readFromResource, replaceStageId, streamTableNode, term, unaryNode}
import org.apache.flink.table.utils.{CatalogManagerMocks, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
import org.apache.flink.types.Row
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.Test
import org.mockito.Mockito.{mock, when}
import java.lang.{Integer => JInt, Long => JLong}
-import org.apache.flink.table.module.ModuleManager
class StreamTableEnvironmentTest extends TableTestBase {
@@ -208,6 +209,123 @@ class StreamTableEnvironmentTest extends TableTestBase {
jTEnv.fromDataStream(ds, $("rt").rowtime(), $("b"), $("c"), $("d"), $("e"), $("pt").proctime())
}
+ @Test
+ def testExecuteSqlWithExplainSelect(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val tableResult2 = util.tableEnv.executeSql(
+ "explain plan for select * from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ val it = tableResult2.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = readFromResource("testExecuteSqlWithExplainSelect0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithExplainInsert(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt1 =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val createTableStmt2 =
+ """
+ |CREATE TABLE MySink (
+ | d bigint,
+ | e int
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+ assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+ val tableResult3 = util.tableEnv.executeSql(
+ "explain plan for insert into MySink select a, b from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+ val it = tableResult3.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = row.getField(0).toString
+ val expected = readFromResource("testExecuteSqlWithExplainInsert0.out")
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
+
+ @Test
+ def testExecuteSqlWithUnsupportedExplain(): Unit = {
+ val util = streamTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ // TODO we can support them later
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan excluding attributes for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan including all attributes for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan with type for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan without implementation for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan as xml for select * from MyTable")
+ testUnsupportedExplain(util.tableEnv,
+ "explain plan as json for select * from MyTable")
+ }
+
+ private def testUnsupportedExplain(tableEnv: StreamTableEnvironment, explain: String): Unit = {
+ try {
+ tableEnv.executeSql(explain)
+ fail("This should not happen")
+ } catch {
+ case e: TableException =>
+ assertTrue(e.getMessage.contains("Only default behavior is supported now"))
+ case e =>
+ fail("This should not happen, " + e.getMessage)
+ }
+ }
+
private def prepareSchemaExpressionParser:
(JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
new file mode 100644
index 0000000..aad6387
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+ LogicalProject(d=[$0], e=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+ DataStreamCalc(select=[a, b], where=[>(a, 10)])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : from: (a, b)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : where: (>(a, 10)), select: (a, b)
+ ship_strategy : FORWARD
+
+ : Operator
+ content : to: Row
+ ship_strategy : FORWARD
+
+ : Data Sink
+ content : Sink: Unnamed
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
new file mode 100644
index 0000000..98206ae
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out
@@ -0,0 +1,36 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+ LogicalProject(d=[$0], e=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
+ DataSetCalc(select=[a, b], where=[>(a, 10)])
+ BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : Map
+ content : from: (a, b)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : Map
+ Partitioning : RANDOM_PARTITIONED
+
+ : FlatMap
+ content : where: (>(a, 10)), select: (a, b)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : FlatMap
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : org.apache.flink.api.java.io.DiscardingOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+ StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+
+ : Operator
+ content : Map
+ ship_strategy : FORWARD
+
+ : Operator
+ content : where: (>(a, 10)), select: (a, b, c)
+ ship_strategy : FORWARD
+
diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+ BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+ content : collect elements with CollectionInputFormat
+ Partitioning : RANDOM_PARTITIONED
+
+ : FlatMap
+ content : where: (>(a, 10)), select: (a, b, c)
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ driver_strategy : FlatMap
+ Partitioning : RANDOM_PARTITIONED
+
+ : Data Sink
+ content : org.apache.flink.api.java.io.DiscardingOutputFormat
+ ship_strategy : Forward
+ exchange_mode : PIPELINED
+ Partitioning : RANDOM_PARTITIONED
+