You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/06 12:42:51 UTC

[3/4] flink git commit: [FLINK-5257] [table] Include optimized logical plan in explain().

[FLINK-5257] [table] Include optimized logical plan in explain().

This closes #2949.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f9633cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f9633cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f9633cd

Branch: refs/heads/master
Commit: 6f9633cd9d3124e2abe5034806fd48cb130e2d93
Parents: ebe228d
Author: Kurt Young <yk...@gmail.com>
Authored: Tue Dec 6 09:23:22 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 6 11:23:23 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/BatchTableEnvironment.scala | 44 +++++++++++++-------
 .../api/table/StreamTableEnvironment.scala      | 44 +++++++++++++-------
 .../src/test/scala/resources/testFilter0.out    |  4 ++
 .../src/test/scala/resources/testFilter1.out    |  4 ++
 .../test/scala/resources/testFilterStream0.out  |  4 ++
 .../src/test/scala/resources/testJoin0.out      |  6 +++
 .../src/test/scala/resources/testJoin1.out      |  6 +++
 .../src/test/scala/resources/testUnion0.out     |  5 +++
 .../src/test/scala/resources/testUnion1.out     |  5 +++
 .../test/scala/resources/testUnionStream0.out   |  5 +++
 10 files changed, 96 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 24b385c..918b01b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -26,9 +26,9 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
@@ -166,22 +166,25 @@ abstract class BatchTableEnvironment(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-
-    val ast = RelOptUtil.toString(table.getRelNode)
-    val dataSet = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+    val ast = table.getRelNode
+    val optimizedPlan = optimize(ast)
+    val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
 
     s"== Abstract Syntax Tree ==" +
-    System.lineSeparator +
-    s"$ast" +
-    System.lineSeparator +
-    s"== Physical Execution Plan ==" +
-    System.lineSeparator +
-    s"$sqlPlan"
-
+        System.lineSeparator +
+        s"${RelOptUtil.toString(ast)}" +
+        System.lineSeparator +
+        s"== Optimized Logical Plan ==" +
+        System.lineSeparator +
+        s"${RelOptUtil.toString(optimizedPlan)}" +
+        System.lineSeparator +
+        s"== Physical Execution Plan ==" +
+        System.lineSeparator +
+        s"$sqlPlan"
   }
 
   /**
@@ -275,17 +278,27 @@ abstract class BatchTableEnvironment(
     * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
     *
     * @param table The root node of the relational expression tree.
-    * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+    * @param tpe   The [[TypeInformation]] of the resulting [[DataSet]].
     * @tparam A The type of the resulting [[DataSet]].
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+    val dataSetPlan = optimize(table.getRelNode)
+    translate(dataSetPlan)
+  }
 
+  /**
+    * Translates a logical [[RelNode]] into a [[DataSet]].
+    *
+    * @param logicalPlan The root node of the relational expression tree.
+    * @param tpe         The [[TypeInformation]] of the resulting [[DataSet]].
+    * @tparam A The type of the resulting [[DataSet]].
+    * @return The [[DataSet]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
     validateType(tpe)
 
-    val dataSetPlan = optimize(table.getRelNode)
-
-    dataSetPlan match {
+    logicalPlan match {
       case node: DataSetRel =>
         node.translateToPlan(
           this,
@@ -294,5 +307,4 @@ abstract class BatchTableEnvironment(
       case _ => ???
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index bca8d79..8f00586 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -32,8 +32,8 @@ import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
+import org.apache.flink.api.table.plan.schema.{DataStreamTable, TableSourceTable}
 import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
-import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataStreamTable}
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -291,12 +291,24 @@ abstract class StreamTableEnvironment(
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+    val dataStreamPlan = optimize(table.getRelNode)
+    translate(dataStreamPlan)
+  }
 
-    validateType(tpe)
+  /**
+    * Translates a logical [[RelNode]] into a [[DataStream]].
+    *
+    * @param logicalPlan The root node of the relational expression tree.
+    * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
+    * @tparam A The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A]
+      (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
 
-   val dataStreamPlan = optimize(table.getRelNode)
+    validateType(tpe)
 
-    dataStreamPlan match {
+    logicalPlan match {
       case node: DataStreamRel =>
         node.translateToPlan(
           this,
@@ -304,7 +316,6 @@ abstract class StreamTableEnvironment(
         ).asInstanceOf[DataStream[A]]
       case _ => ???
     }
-
   }
 
   /**
@@ -314,10 +325,9 @@ abstract class StreamTableEnvironment(
     * @param table The table for which the AST and execution plan will be returned.
     */
   def explain(table: Table): String = {
-
-    val ast = RelOptUtil.toString(table.getRelNode)
-
-    val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+    val ast = table.getRelNode
+    val optimizedPlan = optimize(ast)
+    val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan
@@ -325,12 +335,16 @@ abstract class StreamTableEnvironment(
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
 
     s"== Abstract Syntax Tree ==" +
-      System.lineSeparator +
-      s"$ast" +
-      System.lineSeparator +
-      s"== Physical Execution Plan ==" +
-      System.lineSeparator +
-      s"$sqlPlan"
+        System.lineSeparator +
+        s"${RelOptUtil.toString(ast)}" +
+        System.lineSeparator +
+        s"== Optimized Logical Plan ==" +
+        System.lineSeparator +
+        s"${RelOptUtil.toString(optimizedPlan)}" +
+        System.lineSeparator +
+        s"== Physical Execution Plan ==" +
+        System.lineSeparator +
+        s"$sqlPlan"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
index b3786d9..b6ea86f 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
@@ -2,6 +2,10 @@
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
   LogicalTableScan(table=[[_DataSetTable_0]])
 
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+  DataSetScan(table=[[_DataSetTable_0]])
+
 == Physical Execution Plan ==
 Stage 3 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
index 1049466..719edd9 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
@@ -2,6 +2,10 @@
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
   LogicalTableScan(table=[[_DataSetTable_0]])
 
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+  DataSetScan(table=[[_DataSetTable_0]])
+
 == Physical Execution Plan ==
 Stage 3 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
index 20ae2b1..022f6c9 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
@@ -2,6 +2,10 @@
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
   LogicalTableScan(table=[[_DataStreamTable_0]])
 
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+  DataStreamScan(table=[[_DataStreamTable_0]])
+
 == Physical Execution Plan ==
 Stage 1 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
index 11961ef..4f091f6 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2])
       LogicalTableScan(table=[[_DataSetTable_0]])
       LogicalTableScan(table=[[_DataSetTable_1]])
 
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, c])
+  DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
+    DataSetScan(table=[[_DataSetTable_0]])
+    DataSetScan(table=[[_DataSetTable_1]])
+
 == Physical Execution Plan ==
 Stage 4 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
index c6e8b34..e9dad57 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2])
       LogicalTableScan(table=[[_DataSetTable_0]])
       LogicalTableScan(table=[[_DataSetTable_1]])
 
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, c])
+  DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
+    DataSetScan(table=[[_DataSetTable_0]])
+    DataSetScan(table=[[_DataSetTable_1]])
+
 == Physical Execution Plan ==
 Stage 4 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
index d17517f..5fbd1b5 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataSetTable_0]])
   LogicalTableScan(table=[[_DataSetTable_1]])
 
+== Optimized Logical Plan ==
+DataSetUnion(union=[count, word])
+  DataSetScan(table=[[_DataSetTable_0]])
+  DataSetScan(table=[[_DataSetTable_1]])
+
 == Physical Execution Plan ==
 Stage 3 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
index 875f77b..d7d343b 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataSetTable_0]])
   LogicalTableScan(table=[[_DataSetTable_1]])
 
+== Optimized Logical Plan ==
+DataSetUnion(union=[count, word])
+  DataSetScan(table=[[_DataSetTable_0]])
+  DataSetScan(table=[[_DataSetTable_1]])
+
 == Physical Execution Plan ==
 Stage 3 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index ac3635d..fc83c0d 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataStreamTable_0]])
   LogicalTableScan(table=[[_DataStreamTable_1]])
 
+== Optimized Logical Plan ==
+DataStreamUnion(union=[count, word])
+  DataStreamScan(table=[[_DataStreamTable_0]])
+  DataStreamScan(table=[[_DataStreamTable_1]])
+
 == Physical Execution Plan ==
 Stage 1 : Data Source
 	content : collect elements with CollectionInputFormat