You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/01/19 22:52:33 UTC

[1/3] flink git commit: [FLINK-5520] [table] Disable outer joins with non-equality predicates.

Repository: flink
Updated Branches:
  refs/heads/master 4833e74e7 -> d1301c82b


[FLINK-5520] [table] Disable outer joins with non-equality predicates.

This closes #3141.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1301c82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1301c82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1301c82

Branch: refs/heads/master
Commit: d1301c82b85c00284d90e8f5bdac4fd86dc5b173
Parents: 0ea996a
Author: lincoln-lil <li...@gmail.com>
Authored: Tue Jan 17 22:42:39 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jan 19 23:52:00 2017 +0100

----------------------------------------------------------------------
 .../flink/table/plan/logical/operators.scala    |  52 ++++++---
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   3 +-
 .../plan/rules/dataSet/DataSetJoinRule.scala    |   7 +-
 .../table/api/scala/batch/sql/JoinITCase.scala  | 107 ++++++++++++++++++-
 .../api/scala/batch/table/JoinITCase.scala      |  89 +++++++++++++--
 .../stringexpr/JoinStringExpressionTest.scala   |  18 ----
 6 files changed, 228 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 743bdfe..3ba0285 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -442,7 +442,7 @@ case class Join(
 
     val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
     if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
-      failValidation(s"Filter operator requires a boolean expression as input, " + 
+      failValidation(s"Filter operator requires a boolean expression as input, " +
         s"but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}")
     } else if (ambiguousName.nonEmpty) {
       failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
@@ -454,30 +454,54 @@ case class Join(
 
   private def testJoinCondition(expression: Expression): Unit = {
 
-    def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
-        case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
-          if x.isFromLeftInput != y.isFromLeftInput => Unit
-        case x => failValidation(
-          s"Invalid non-join predicate $exp. For non-join predicates use Table#where.")
-      }
+    def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
+      case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
+        if x.isFromLeftInput != y.isFromLeftInput => true
+      case _ => false
+    }
+
+    def checkIfFilterCondition(exp: BinaryComparison) = exp.children match {
+      case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil => false
+      case (x: JoinFieldReference) :: (_) :: Nil => true
+      case (_) :: (y: JoinFieldReference) :: Nil => true
+      case _ => false
+    }
+
+    var equiJoinPredicateFound = false
+    var nonEquiJoinPredicateFound = false
+    var localPredicateFound = false
 
-    var equiJoinFound = false
     def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
       case x: And => x.children.foreach(validateConditions(_, isAndBranch))
       case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
       case x: EqualTo =>
-        if (isAndBranch) {
-          equiJoinFound = true
+        if (isAndBranch && checkIfJoinCondition(x)) {
+          equiJoinPredicateFound = true
+        }
+        if (checkIfFilterCondition(x)) {
+          localPredicateFound = true
         }
-        checkIfJoinCondition(x)
-      case x: BinaryComparison => checkIfJoinCondition(x)
+      case x: BinaryComparison => {
+        if (checkIfFilterCondition(x)) {
+          localPredicateFound = true
+        } else {
+          nonEquiJoinPredicateFound = true
+        }
+      }
       case x => failValidation(
         s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
     }
 
     validateConditions(expression, isAndBranch = true)
-    if (!equiJoinFound) {
-      failValidation(s"Invalid join condition: $expression. At least one equi-join required.")
+    if (!equiJoinPredicateFound) {
+      failValidation(
+        s"Invalid join condition: $expression. At least one equi-join predicate is " +
+          s"required.")
+    }
+    if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
+      failValidation(
+        s"Invalid join condition: $expression. Non-equality join predicates or local" +
+          s" predicates are not supported in outer joins.")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index 324e949..edb5be2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -31,7 +31,8 @@ import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.FlatJoinRunner
 import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
 import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rex.RexNode
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
 import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
 
 import scala.collection.JavaConversions._

http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
index 2874198..3f49c6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.plan.rules.dataSet
 import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.logical.LogicalJoin
-
-import org.apache.flink.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin}
 
 import scala.collection.JavaConversions._
 
@@ -40,7 +40,8 @@ class DataSetJoinRule
     val joinInfo = join.analyzeCondition
 
     // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
-    !joinInfo.pairs().isEmpty
+    // and disable outer joins with non-equality predicates(see FLINK-5520)
+    !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
   }
 
   override def convert(rel: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
index d07c282..9df17ad 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -78,7 +78,7 @@ class JoinITCase(
   }
 
   @Test
-  def testJoinWithJoinFilter(): Unit = {
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -92,8 +92,7 @@ class JoinITCase(
 
     val result = tEnv.sql(sqlQuery)
 
-    val expected = "Hello world, how are you?,Hallo Welt wie\n" +
-      "I am fine.,Hallo Welt wie\n"
+    val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -375,4 +374,106 @@ class JoinITCase(
 
     Assert.assertEquals(0, result)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithLocalPredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLeftOuterJoinWithLocalPredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFullOuterJoinWithLocalPredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
index 3305949..5993728 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
@@ -78,11 +78,26 @@ class JoinITCase(
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
+    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6).select('c, 'g)
+
+    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
+      "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
     val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
 
-    val expected = "Hello world, how are you?,Hallo Welt wie\n" +
-      "I am fine.,Hallo Welt wie\n"
     val results = joinT.toDataSet[Row].collect()
+    val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -97,7 +112,7 @@ class JoinITCase(
     val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+    "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
     val results = joinT.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -234,8 +249,8 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
-  def testRightJoinWithNotOnlyEquiJoin(): Unit = {
+  @Test(expected = classOf[ValidationException])
+  def testLeftJoinWithNonEquiJoinPredicate(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
@@ -243,11 +258,67 @@ class JoinITCase(
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
-    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
+  }
 
-    val expected = "Hello world,BCD\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  @Test(expected = classOf[ValidationException])
+  def testFullJoinWithNonEquiJoinPredicate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testRightJoinWithNonEquiJoinPredicate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testLeftJoinWithLocalPredicate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFullJoinWithLocalPredicate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testRightJoinWithLocalPredicate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
index 025cda9..b2f683c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
@@ -238,24 +238,6 @@ class JoinStringExpressionTest {
   }
 
   @Test
-  def testRightJoinWithNotOnlyEquiJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
-    val t2 = ds1.rightOuterJoin(ds2, "a = d && b < h").select("c, g")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
   def testFullOuterJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)


[2/3] flink git commit: [FLINK-5452] [table] Fix SortITCase which fails under cluster mode.

Posted by fh...@apache.org.
[FLINK-5452] [table] Fix SortITCase which fails under cluster mode.

This closes #3095.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ea996a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ea996a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ea996a6

Branch: refs/heads/master
Commit: 0ea996a64ca0ff9589ffcd5b89967f51aee1ffe6
Parents: 0c6e0ee
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Jan 19 16:31:24 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jan 19 23:52:00 2017 +0100

----------------------------------------------------------------------
 .../table/api/scala/batch/sql/SortITCase.scala  | 81 +++++++++++-------
 .../api/scala/batch/table/SortITCase.scala      | 88 ++++++++++++--------
 2 files changed, 107 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ea996a6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
index 43847dc..c577797 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
@@ -18,16 +18,15 @@
 
 package org.apache.flink.table.api.scala.batch.sql
 
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.scala._
-import org.apache.flink.api.scala._
-import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -35,77 +34,95 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class SortITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
+class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {
+
+  private def getExecutionEnvironment = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    // set the parallelism explicitly to make sure the query is executed in
+    // a distributed manner
+    env.setParallelism(3)
+    env
+  }
 
   @Test
   def testOrderByMultipleFieldsWithSql(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
+    val env = getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
 
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
     val expected = sortExpectedly(tupleDataSetStrings)
+    // squash all rows inside a partition into one element
     val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
+    def rowOrdering = Ordering.by((r : Row) => {
+      // ordering for this tuple will fall into the previous defined tupleOrdering,
+      // so we just need to return the field by their defining sequence
+      (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
+    })
+
     val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        .filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)(rowOrdering)
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
   @Test
   def testOrderByWithOffset(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
+    val env = getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
 
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       - x.productElement(0).asInstanceOf[Int] )
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
     val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
+    // squash all rows inside a partition into one element
     val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
     val result = results.
-      filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
   @Test
   def testOrderByWithOffsetAndFetch(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
+    val env = getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"
 
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       x.productElement(0).asInstanceOf[Int] )
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
     val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
+    // squash all rows inside a partition into one element
     val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
     val result = results
       .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
+        // sort all partitions by their head element to verify the order across partitions
+      .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
       .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
@@ -113,31 +130,39 @@ class SortITCase(
 
   @Test
   def testOrderByLimit(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
+    val env = getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable ORDER BY _2, _1 LIMIT 5"
 
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       (x.productElement(1).asInstanceOf[Long], x.productElement(0).asInstanceOf[Int]) )
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
     val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+    // squash all rows inside a partition into one element
     val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
+    def rowOrdering = Ordering.by((r : Row) => {
+      // ordering for this tuple will fall into the previous defined tupleOrdering,
+      // so we just need to return the field by their defining sequence
+      (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
+    })
+
     val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        .filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)(rowOrdering)
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
   @Test(expected = classOf[TableException])
   def testLimitWithoutOrder(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
+    val env = getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT * FROM MyTable LIMIT 5"

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea996a6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
index 6fe7624..a84d8a9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
@@ -18,15 +18,15 @@
 
 package org.apache.flink.table.api.scala.batch.table
 
-import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.types.Row
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -34,13 +34,13 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class SortITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
+class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {
 
-  def getExecutionEnvironment = {
+  private def getExecutionEnvironment = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(4)
+    // set the parallelism explicitly to make sure the query is executed in
+    // a distributed manner
+    env.setParallelism(3)
     env
   }
 
@@ -51,16 +51,18 @@ class SortITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.desc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       - x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings)
+    // squash all rows inside a partition into one element
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
     val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        .filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -72,16 +74,18 @@ class SortITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.asc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings)
+    // squash all rows inside a partition into one element
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
     val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        .filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -93,16 +97,24 @@ class SortITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       (x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )
 
     val expected = sortExpectedly(tupleDataSetStrings)
+    // squash all rows inside a partition into one element
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
+    def rowOrdering = Ordering.by((r : Row) => {
+      // ordering for this tuple will fall into the previous defined tupleOrdering,
+      // so we just need to return the field by their defining sequence
+      (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
+    })
+
     val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        .filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)(rowOrdering)
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -114,16 +126,18 @@ class SortITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
+    // squash all rows inside a partition into one element
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
     val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        .filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -135,16 +149,18 @@ class SortITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       - x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
+    // squash all rows inside a partition into one element
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
     val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        .filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
@@ -156,16 +172,20 @@ class SortITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       x.productElement(0).asInstanceOf[Int] )
 
     val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+    // squash all rows inside a partition into one element
     val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
 
+    implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int])
+
     val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
+        .filterNot(_.isEmpty)
+        // sort all partitions by their head element to verify the order across partitions
+        .sortBy(_.head)
+        .reduceLeft(_ ++ _)
 
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }


[3/3] flink git commit: [hotfix] [docs] Add closing parentheses on "DataStream API Programming Guide".

Posted by fh...@apache.org.
[hotfix] [docs] Add closing parentheses on "DataStream API Programming Guide".

This closes #3153.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c6e0ee9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c6e0ee9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c6e0ee9

Branch: refs/heads/master
Commit: 0c6e0ee9c15628dcdf76fd7ce4dae4614f589ce4
Parents: 4833e74
Author: Keiji Yoshida <ke...@gmail.com>
Authored: Wed Jan 18 21:01:36 2017 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jan 19 23:52:00 2017 +0100

----------------------------------------------------------------------
 docs/dev/datastream_api.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c6e0ee9/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 850d8c5..4fc79dd 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -348,7 +348,7 @@ windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
     public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
         return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
     }
-};
+});
     {% endhighlight %}
           </td>
         </tr>
@@ -363,7 +363,7 @@ windowedStream.fold("start", new FoldFunction<Integer, String>() {
     public String fold(String current, Integer value) {
         return current + "-" + value;
     }
-};
+});
     {% endhighlight %}
           </td>
         </tr>