You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/06 12:42:51 UTC
[3/4] flink git commit: [FLINK-5257] [table] Include optimized
logical plan in explain().
[FLINK-5257] [table] Include optimized logical plan in explain().
This closes #2949.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f9633cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f9633cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f9633cd
Branch: refs/heads/master
Commit: 6f9633cd9d3124e2abe5034806fd48cb130e2d93
Parents: ebe228d
Author: Kurt Young <yk...@gmail.com>
Authored: Tue Dec 6 09:23:22 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 6 11:23:23 2016 +0100
----------------------------------------------------------------------
.../flink/api/table/BatchTableEnvironment.scala | 44 +++++++++++++-------
.../api/table/StreamTableEnvironment.scala | 44 +++++++++++++-------
.../src/test/scala/resources/testFilter0.out | 4 ++
.../src/test/scala/resources/testFilter1.out | 4 ++
.../test/scala/resources/testFilterStream0.out | 4 ++
.../src/test/scala/resources/testJoin0.out | 6 +++
.../src/test/scala/resources/testJoin1.out | 6 +++
.../src/test/scala/resources/testUnion0.out | 5 +++
.../src/test/scala/resources/testUnion1.out | 5 +++
.../test/scala/resources/testUnionStream0.out | 5 +++
10 files changed, 96 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 24b385c..918b01b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -26,9 +26,9 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{Programs, RuleSet}
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.table.explain.PlanJsonParser
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
@@ -166,22 +166,25 @@ abstract class BatchTableEnvironment(
* @param extended Flag to include detailed optimizer estimates.
*/
private[flink] def explain(table: Table, extended: Boolean): String = {
-
- val ast = RelOptUtil.toString(table.getRelNode)
- val dataSet = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+ val ast = table.getRelNode
+ val optimizedPlan = optimize(ast)
+ val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
dataSet.output(new DiscardingOutputFormat[Row])
val env = dataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
s"== Abstract Syntax Tree ==" +
- System.lineSeparator +
- s"$ast" +
- System.lineSeparator +
- s"== Physical Execution Plan ==" +
- System.lineSeparator +
- s"$sqlPlan"
-
+ System.lineSeparator +
+ s"${RelOptUtil.toString(ast)}" +
+ System.lineSeparator +
+ s"== Optimized Logical Plan ==" +
+ System.lineSeparator +
+ s"${RelOptUtil.toString(optimizedPlan)}" +
+ System.lineSeparator +
+ s"== Physical Execution Plan ==" +
+ System.lineSeparator +
+ s"$sqlPlan"
}
/**
@@ -275,17 +278,27 @@ abstract class BatchTableEnvironment(
* Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
*
* @param table The root node of the relational expression tree.
- * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+ * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+ val dataSetPlan = optimize(table.getRelNode)
+ translate(dataSetPlan)
+ }
+ /**
+ * Translates a logical [[RelNode]] into a [[DataSet]].
+ *
+ * @param logicalPlan The root node of the relational expression tree.
+ * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+ * @tparam A The type of the resulting [[DataSet]].
+ * @return The [[DataSet]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
validateType(tpe)
- val dataSetPlan = optimize(table.getRelNode)
-
- dataSetPlan match {
+ logicalPlan match {
case node: DataSetRel =>
node.translateToPlan(
this,
@@ -294,5 +307,4 @@ abstract class BatchTableEnvironment(
case _ => ???
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index bca8d79..8f00586 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -32,8 +32,8 @@ import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
+import org.apache.flink.api.table.plan.schema.{DataStreamTable, TableSourceTable}
import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
-import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataStreamTable}
import org.apache.flink.api.table.sources.StreamTableSource
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -291,12 +291,24 @@ abstract class StreamTableEnvironment(
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+ val dataStreamPlan = optimize(table.getRelNode)
+ translate(dataStreamPlan)
+ }
- validateType(tpe)
+ /**
+ * Translates a logical [[RelNode]] into a [[DataStream]].
+ *
+ * @param logicalPlan The root node of the relational expression tree.
+ * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+ * @tparam A The type of the resulting [[DataStream]].
+ * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A]
+ (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
- val dataStreamPlan = optimize(table.getRelNode)
+ validateType(tpe)
- dataStreamPlan match {
+ logicalPlan match {
case node: DataStreamRel =>
node.translateToPlan(
this,
@@ -304,7 +316,6 @@ abstract class StreamTableEnvironment(
).asInstanceOf[DataStream[A]]
case _ => ???
}
-
}
/**
@@ -314,10 +325,9 @@ abstract class StreamTableEnvironment(
* @param table The table for which the AST and execution plan will be returned.
*/
def explain(table: Table): String = {
-
- val ast = RelOptUtil.toString(table.getRelNode)
-
- val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+ val ast = table.getRelNode
+ val optimizedPlan = optimize(ast)
+ val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
val env = dataStream.getExecutionEnvironment
val jsonSqlPlan = env.getExecutionPlan
@@ -325,12 +335,16 @@ abstract class StreamTableEnvironment(
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
s"== Abstract Syntax Tree ==" +
- System.lineSeparator +
- s"$ast" +
- System.lineSeparator +
- s"== Physical Execution Plan ==" +
- System.lineSeparator +
- s"$sqlPlan"
+ System.lineSeparator +
+ s"${RelOptUtil.toString(ast)}" +
+ System.lineSeparator +
+ s"== Optimized Logical Plan ==" +
+ System.lineSeparator +
+ s"${RelOptUtil.toString(optimizedPlan)}" +
+ System.lineSeparator +
+ s"== Physical Execution Plan ==" +
+ System.lineSeparator +
+ s"$sqlPlan"
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
index b3786d9..b6ea86f 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
@@ -2,6 +2,10 @@
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataSetTable_0]])
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+ DataSetScan(table=[[_DataSetTable_0]])
+
== Physical Execution Plan ==
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
index 1049466..719edd9 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
@@ -2,6 +2,10 @@
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataSetTable_0]])
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+ DataSetScan(table=[[_DataSetTable_0]])
+
== Physical Execution Plan ==
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
index 20ae2b1..022f6c9 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
@@ -2,6 +2,10 @@
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataStreamTable_0]])
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+ DataStreamScan(table=[[_DataStreamTable_0]])
+
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
index 11961ef..4f091f6 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, c])
+ DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
+ DataSetScan(table=[[_DataSetTable_0]])
+ DataSetScan(table=[[_DataSetTable_1]])
+
== Physical Execution Plan ==
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
index c6e8b34..e9dad57 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, c])
+ DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
+ DataSetScan(table=[[_DataSetTable_0]])
+ DataSetScan(table=[[_DataSetTable_1]])
+
== Physical Execution Plan ==
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
index d17517f..5fbd1b5 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
+== Optimized Logical Plan ==
+DataSetUnion(union=[count, word])
+ DataSetScan(table=[[_DataSetTable_0]])
+ DataSetScan(table=[[_DataSetTable_1]])
+
== Physical Execution Plan ==
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
index 875f77b..d7d343b 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
+== Optimized Logical Plan ==
+DataSetUnion(union=[count, word])
+ DataSetScan(table=[[_DataSetTable_0]])
+ DataSetScan(table=[[_DataSetTable_1]])
+
== Physical Execution Plan ==
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index ac3635d..fc83c0d 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
+== Optimized Logical Plan ==
+DataStreamUnion(union=[count, word])
+ DataStreamScan(table=[[_DataStreamTable_0]])
+ DataStreamScan(table=[[_DataStreamTable_1]])
+
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat