You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/06 12:42:49 UTC

[1/4] flink git commit: [FLINK-5249] [docs] Fix description of datastream rescaling to match the figure.

Repository: flink
Updated Branches:
  refs/heads/master 8134f4433 -> 98d182603


[FLINK-5249] [docs] Fix description of datastream rescaling to match the figure.

This closes #2932.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/96239b30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/96239b30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/96239b30

Branch: refs/heads/master
Commit: 96239b3056fa437f450e414d94e61157b77f7e07
Parents: 8134f44
Author: David Anderson <da...@alpinegizmo.com>
Authored: Sun Dec 4 15:32:23 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 6 10:52:50 2016 +0100

----------------------------------------------------------------------
 docs/dev/datastream_api.md | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/96239b30/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 57d2e4b..ef4a394 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -1008,19 +1008,18 @@ dataStream.rebalance();
             The subset of downstream operations to which the upstream operation sends
             elements depends on the degree of parallelism of both the upstream and downstream operation.
             For example, if the upstream operation has parallelism 2 and the downstream operation
-            has parallelism 4, then one upstream operation would distribute elements to two
+            has parallelism 6, then one upstream operation would distribute elements to three
             downstream operations while the other upstream operation would distribute to the other
-            two downstream operations. If, on the other hand, the downstream operation has parallelism
-            2 while the upstream operation has parallelism 4 then two upstream operations would
-            distribute to one downstream operation while the other two upstream operations would
-            distribute to the other downstream operations.
+            three downstream operations. If, on the other hand, the downstream operation has parallelism
+            2 while the upstream operation has parallelism 6 then three upstream operations would
+            distribute to one downstream operation while the other three upstream operations would
+            distribute to the other downstream operation.
         </p>
         <p>
             In cases where the different parallelisms are not multiples of each other one or several
             downstream operations will have a differing number of inputs from upstream operations.
-
-        </p>
         </p>
+        <p>
             Please see this figure for a visualization of the connection pattern in the above
             example:
         </p>


[4/4] flink git commit: [FLINK-5251] [table] Decouple StreamTableSourceScan from TableSourceTable.

Posted by fh...@apache.org.
[FLINK-5251] [table] Decouple StreamTableSourceScan from TableSourceTable.

This closes #2934.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98d18260
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98d18260
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98d18260

Branch: refs/heads/master
Commit: 98d1826030d1486a3d64466aff1c909a41e2de10
Parents: 6f9633c
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Dec 5 09:43:13 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 6 11:23:28 2016 +0100

----------------------------------------------------------------------
 .../plan/nodes/datastream/DataStreamScan.scala   |  6 ++++--
 .../table/plan/nodes/datastream/StreamScan.scala |  5 +----
 .../nodes/datastream/StreamTableSourceScan.scala | 19 ++++++++++---------
 .../datastream/StreamTableSourceScanRule.scala   |  6 +++++-
 4 files changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
index 463e1bc..da83b64 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
@@ -35,11 +35,13 @@ class DataStreamScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    rowType: RelDataType)
-  extends StreamScan(cluster, traitSet, table, rowType) {
+    rowRelDataType: RelDataType)
+  extends StreamScan(cluster, traitSet, table) {
 
   val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
 
+  override def deriveRowType() = rowRelDataType
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamScan(
       cluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
index 17620d0..b13770e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
@@ -37,13 +37,10 @@ import scala.collection.JavaConverters._
 abstract class StreamScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    table: RelOptTable,
-    rowRelDataType: RelDataType)
+    table: RelOptTable)
   extends TableScan(cluster, traitSet, table)
   with DataStreamRel {
 
-  override def deriveRowType() = rowRelDataType
-
   protected def convertToExpectedType(
       input: DataStream[Any],
       flinkTable: FlinkTable[_],

http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 21b8a63..8201070 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -20,11 +20,10 @@ package org.apache.flink.api.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
 import org.apache.flink.api.table.plan.schema.TableSourceTable
 import org.apache.flink.api.table.sources.StreamTableSource
+import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
 import org.apache.flink.streaming.api.datastream.DataStream
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -32,18 +31,20 @@ class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    rowType: RelDataType)
-  extends StreamScan(cluster, traitSet, table, rowType) {
+    tableSource: StreamTableSource[_])
+  extends StreamScan(cluster, traitSet, table) {
 
-  val tableSourceTable = table.unwrap(classOf[TableSourceTable])
-  val tableSource = tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]]
+  override def deriveRowType() = {
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+  }
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new StreamTableSourceScan(
       cluster,
       traitSet,
-      table,
-      rowType
+      getTable,
+      tableSource
     )
   }
 
@@ -55,7 +56,7 @@ class StreamTableSourceScan(
     val inputDataStream: DataStream[Any] = tableSource
       .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
 
-    convertToExpectedType(inputDataStream, tableSourceTable, expectedType, config)
+    convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index 9d8075c..91dd255 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -58,11 +58,15 @@ class StreamTableSourceScanRule
     val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
 
+    // The original registered table source
+    val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
+
     new StreamTableSourceScan(
       rel.getCluster,
       traitSet,
       scan.getTable,
-      rel.getRowType
+      tableSource
     )
   }
 }


[3/4] flink git commit: [FLINK-5257] [table] Include optimized logical plan in explain().

Posted by fh...@apache.org.
[FLINK-5257] [table] Include optimized logical plan in explain().

This closes #2949.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f9633cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f9633cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f9633cd

Branch: refs/heads/master
Commit: 6f9633cd9d3124e2abe5034806fd48cb130e2d93
Parents: ebe228d
Author: Kurt Young <yk...@gmail.com>
Authored: Tue Dec 6 09:23:22 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 6 11:23:23 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/BatchTableEnvironment.scala | 44 +++++++++++++-------
 .../api/table/StreamTableEnvironment.scala      | 44 +++++++++++++-------
 .../src/test/scala/resources/testFilter0.out    |  4 ++
 .../src/test/scala/resources/testFilter1.out    |  4 ++
 .../test/scala/resources/testFilterStream0.out  |  4 ++
 .../src/test/scala/resources/testJoin0.out      |  6 +++
 .../src/test/scala/resources/testJoin1.out      |  6 +++
 .../src/test/scala/resources/testUnion0.out     |  5 +++
 .../src/test/scala/resources/testUnion1.out     |  5 +++
 .../test/scala/resources/testUnionStream0.out   |  5 +++
 10 files changed, 96 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 24b385c..918b01b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -26,9 +26,9 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
@@ -166,22 +166,25 @@ abstract class BatchTableEnvironment(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-
-    val ast = RelOptUtil.toString(table.getRelNode)
-    val dataSet = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+    val ast = table.getRelNode
+    val optimizedPlan = optimize(ast)
+    val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
 
     s"== Abstract Syntax Tree ==" +
-    System.lineSeparator +
-    s"$ast" +
-    System.lineSeparator +
-    s"== Physical Execution Plan ==" +
-    System.lineSeparator +
-    s"$sqlPlan"
-
+        System.lineSeparator +
+        s"${RelOptUtil.toString(ast)}" +
+        System.lineSeparator +
+        s"== Optimized Logical Plan ==" +
+        System.lineSeparator +
+        s"${RelOptUtil.toString(optimizedPlan)}" +
+        System.lineSeparator +
+        s"== Physical Execution Plan ==" +
+        System.lineSeparator +
+        s"$sqlPlan"
   }
 
   /**
@@ -275,17 +278,27 @@ abstract class BatchTableEnvironment(
     * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
     *
     * @param table The root node of the relational expression tree.
-    * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+    * @param tpe   The [[TypeInformation]] of the resulting [[DataSet]].
     * @tparam A The type of the resulting [[DataSet]].
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+    val dataSetPlan = optimize(table.getRelNode)
+    translate(dataSetPlan)
+  }
 
+  /**
+    * Translates a logical [[RelNode]] into a [[DataSet]].
+    *
+    * @param logicalPlan The root node of the relational expression tree.
+    * @param tpe         The [[TypeInformation]] of the resulting [[DataSet]].
+    * @tparam A The type of the resulting [[DataSet]].
+    * @return The [[DataSet]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
     validateType(tpe)
 
-    val dataSetPlan = optimize(table.getRelNode)
-
-    dataSetPlan match {
+    logicalPlan match {
       case node: DataSetRel =>
         node.translateToPlan(
           this,
@@ -294,5 +307,4 @@ abstract class BatchTableEnvironment(
       case _ => ???
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index bca8d79..8f00586 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -32,8 +32,8 @@ import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
+import org.apache.flink.api.table.plan.schema.{DataStreamTable, TableSourceTable}
 import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
-import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataStreamTable}
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -291,12 +291,24 @@ abstract class StreamTableEnvironment(
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+    val dataStreamPlan = optimize(table.getRelNode)
+    translate(dataStreamPlan)
+  }
 
-    validateType(tpe)
+  /**
+    * Translates a logical [[RelNode]] into a [[DataStream]].
+    *
+    * @param logicalPlan The root node of the relational expression tree.
+    * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
+    * @tparam A The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A]
+      (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
 
-   val dataStreamPlan = optimize(table.getRelNode)
+    validateType(tpe)
 
-    dataStreamPlan match {
+    logicalPlan match {
       case node: DataStreamRel =>
         node.translateToPlan(
           this,
@@ -304,7 +316,6 @@ abstract class StreamTableEnvironment(
         ).asInstanceOf[DataStream[A]]
       case _ => ???
     }
-
   }
 
   /**
@@ -314,10 +325,9 @@ abstract class StreamTableEnvironment(
     * @param table The table for which the AST and execution plan will be returned.
     */
   def explain(table: Table): String = {
-
-    val ast = RelOptUtil.toString(table.getRelNode)
-
-    val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+    val ast = table.getRelNode
+    val optimizedPlan = optimize(ast)
+    val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan
@@ -325,12 +335,16 @@ abstract class StreamTableEnvironment(
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
 
     s"== Abstract Syntax Tree ==" +
-      System.lineSeparator +
-      s"$ast" +
-      System.lineSeparator +
-      s"== Physical Execution Plan ==" +
-      System.lineSeparator +
-      s"$sqlPlan"
+        System.lineSeparator +
+        s"${RelOptUtil.toString(ast)}" +
+        System.lineSeparator +
+        s"== Optimized Logical Plan ==" +
+        System.lineSeparator +
+        s"${RelOptUtil.toString(optimizedPlan)}" +
+        System.lineSeparator +
+        s"== Physical Execution Plan ==" +
+        System.lineSeparator +
+        s"$sqlPlan"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
index b3786d9..b6ea86f 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
@@ -2,6 +2,10 @@
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
   LogicalTableScan(table=[[_DataSetTable_0]])
 
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+  DataSetScan(table=[[_DataSetTable_0]])
+
 == Physical Execution Plan ==
 Stage 3 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
index 1049466..719edd9 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
@@ -2,6 +2,10 @@
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
   LogicalTableScan(table=[[_DataSetTable_0]])
 
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+  DataSetScan(table=[[_DataSetTable_0]])
+
 == Physical Execution Plan ==
 Stage 3 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
index 20ae2b1..022f6c9 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
@@ -2,6 +2,10 @@
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
   LogicalTableScan(table=[[_DataStreamTable_0]])
 
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
+  DataStreamScan(table=[[_DataStreamTable_0]])
+
 == Physical Execution Plan ==
 Stage 1 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
index 11961ef..4f091f6 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2])
       LogicalTableScan(table=[[_DataSetTable_0]])
       LogicalTableScan(table=[[_DataSetTable_1]])
 
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, c])
+  DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
+    DataSetScan(table=[[_DataSetTable_0]])
+    DataSetScan(table=[[_DataSetTable_1]])
+
 == Physical Execution Plan ==
 Stage 4 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
index c6e8b34..e9dad57 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2])
       LogicalTableScan(table=[[_DataSetTable_0]])
       LogicalTableScan(table=[[_DataSetTable_1]])
 
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, c])
+  DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
+    DataSetScan(table=[[_DataSetTable_0]])
+    DataSetScan(table=[[_DataSetTable_1]])
+
 == Physical Execution Plan ==
 Stage 4 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
index d17517f..5fbd1b5 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataSetTable_0]])
   LogicalTableScan(table=[[_DataSetTable_1]])
 
+== Optimized Logical Plan ==
+DataSetUnion(union=[count, word])
+  DataSetScan(table=[[_DataSetTable_0]])
+  DataSetScan(table=[[_DataSetTable_1]])
+
 == Physical Execution Plan ==
 Stage 3 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
index 875f77b..d7d343b 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataSetTable_0]])
   LogicalTableScan(table=[[_DataSetTable_1]])
 
+== Optimized Logical Plan ==
+DataSetUnion(union=[count, word])
+  DataSetScan(table=[[_DataSetTable_0]])
+  DataSetScan(table=[[_DataSetTable_1]])
+
 == Physical Execution Plan ==
 Stage 3 : Data Source
 	content : collect elements with CollectionInputFormat

http://git-wip-us.apache.org/repos/asf/flink/blob/6f9633cd/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index ac3635d..fc83c0d 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -3,6 +3,11 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataStreamTable_0]])
   LogicalTableScan(table=[[_DataStreamTable_1]])
 
+== Optimized Logical Plan ==
+DataStreamUnion(union=[count, word])
+  DataStreamScan(table=[[_DataStreamTable_0]])
+  DataStreamScan(table=[[_DataStreamTable_1]])
+
 == Physical Execution Plan ==
 Stage 1 : Data Source
 	content : collect elements with CollectionInputFormat


[2/4] flink git commit: [FLINK-5259] [docs] Fix wrong execution environment in batch retry delays example.

Posted by fh...@apache.org.
[FLINK-5259] [docs] Fix wrong execution environment in batch retry delays example.

This closes #2943.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebe228db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebe228db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebe228db

Branch: refs/heads/master
Commit: ebe228db74ede9bccc9b3c482b98acbb397a9130
Parents: 96239b3
Author: David Anderson <da...@alpinegizmo.com>
Authored: Mon Dec 5 11:39:46 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 6 10:53:39 2016 +0100

----------------------------------------------------------------------
 docs/dev/batch/fault_tolerance.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ebe228db/docs/dev/batch/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/fault_tolerance.md b/docs/dev/batch/fault_tolerance.md
index ab870d0..f403791 100644
--- a/docs/dev/batch/fault_tolerance.md
+++ b/docs/dev/batch/fault_tolerance.md
@@ -77,13 +77,13 @@ You can set the retry delay for each program as follows (the sample shows the Da
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val env = ExecutionEnvironment.getExecutionEnvironment()
 env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
 {% endhighlight %}
 </div>