You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/05 12:22:40 UTC

[1/5] flink git commit: [FLINK-7404] [table] Generate code for non-equi join conditions only.

Repository: flink
Updated Branches:
  refs/heads/master ba03b78c7 -> 7c11bd7f4


[FLINK-7404] [table] Generate code for non-equi join conditions only.

This closes #4507.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cf737c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cf737c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cf737c1

Branch: refs/heads/master
Commit: 4cf737c1c12a4d61e9388992ee96875c43faa410
Parents: 0eef8e8
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Aug 9 17:41:10 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 5 13:53:59 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cf737c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index 1583e31..acbf94d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -186,7 +186,8 @@ class DataSetJoin(
            |""".stripMargin
     }
     else {
-      val condition = generator.generateExpression(joinCondition)
+      val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder)
+      val condition = generator.generateExpression(nonEquiPredicates)
       body = s"""
            |${condition.code}
            |if (${condition.resultTerm}) {


[2/5] flink git commit: [FLINK-7572] [table] Improve TableSchema and FlinkTable validation exception messages.

Posted by fh...@apache.org.
[FLINK-7572] [table] Improve TableSchema and FlinkTable validation exception messages.

This closes #4640.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0eef8e8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0eef8e8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0eef8e8c

Branch: refs/heads/master
Commit: 0eef8e8c01041bb0c001282a16c43ea54f859cfa
Parents: b7b0d40
Author: sunjincheng121 <su...@gmail.com>
Authored: Tue Sep 5 08:55:03 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 5 13:53:59 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/table/api/TableSchema.scala    | 15 +++++++-
 .../flink/table/plan/schema/FlinkTable.scala    | 15 +++++++-
 .../validation/FlinkTableValidationTest.scala   | 39 ++++++++++++++++++++
 .../validation/TableSchemaValidationTest.scala  | 27 +++++++++++++-
 4 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0eef8e8c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index a67a07a..6ee65f0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -28,13 +28,24 @@ class TableSchema(
 
   if (columnNames.length != columnTypes.length) {
     throw new TableException(
-      "Number of column indexes and column names must be equal.")
+      s"Number of field names and field types must be equal.\n" +
+        s"Number of names is ${columnNames.length}, number of types is ${columnTypes.length}.\n" +
+        s"List of field names: ${columnNames.mkString("[", ", ", "]")}.\n" +
+        s"List of field types: ${columnTypes.mkString("[", ", ", "]")}.")
   }
 
   // check uniqueness of field names
   if (columnNames.toSet.size != columnTypes.length) {
+    val duplicateFields = columnNames
+      // count occurences of field names
+      .groupBy(identity).mapValues(_.length)
+      // filter for occurences > 1 and map to field name
+      .filter(g => g._2 > 1).keys
+
     throw new TableException(
-      "Table column names must be unique.")
+      s"Field names must be unique.\n" +
+        s"List of duplicate fields: ${duplicateFields.mkString("[", ", ", "]")}.\n" +
+        s"List of all fields: ${columnNames.mkString("[", ", ", "]")}.")
   }
 
   val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap

http://git-wip-us.apache.org/repos/asf/flink/blob/0eef8e8c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
index df56ae6..c76532f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -37,13 +37,24 @@ abstract class FlinkTable[T](
 
   if (fieldIndexes.length != fieldNames.length) {
     throw new TableException(
-      "Number of field indexes and field names must be equal.")
+      s"Number of field names and field indexes must be equal.\n" +
+        s"Number of names is ${fieldNames.length}, number of indexes is ${fieldIndexes.length}.\n" +
+        s"List of column names: ${fieldNames.mkString("[", ", ", "]")}.\n" +
+        s"List of column indexes: ${fieldIndexes.mkString("[", ", ", "]")}.")
   }
 
   // check uniqueness of field names
   if (fieldNames.length != fieldNames.toSet.size) {
+    val duplicateFields = fieldNames
+      // count occurences of field names
+      .groupBy(identity).mapValues(_.length)
+      // filter for occurences > 1 and map to field name
+      .filter(g => g._2 > 1).keys
+
     throw new TableException(
-      "Table field names must be unique.")
+      s"Field names must be unique.\n" +
+        s"List of duplicate fields: ${duplicateFields.mkString("[", ", ", "]")}.\n" +
+        s"List of all fields: ${fieldNames.mkString("[", ", ", "]")}.")
   }
 
   val fieldTypes: Array[TypeInformation[_]] =

http://git-wip-us.apache.org/repos/asf/flink/blob/0eef8e8c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
new file mode 100644
index 0000000..a845f5c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class FlinkTableValidationTest extends TableTestBase {
+
+  @Test
+  def testFieldNamesDuplicate() {
+
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("Field names must be unique.\n" +
+      "List of duplicate fields: [a].\n" +
+      "List of all fields: [a, a, b].")
+
+    val util = batchTestUtil()
+    util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0eef8e8c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
index 1a7815a..c430e59 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSchemaValidationTest.scala
@@ -24,12 +24,35 @@ import org.junit.Test
 
 class TableSchemaValidationTest extends TableTestBase {
 
-  @Test(expected = classOf[TableException])
-  def testInvalidSchema() {
+  @Test
+  def testColumnNameAndColumnTypeNotEqual() {
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage(
+      "Number of field names and field types must be equal.\n" +
+        "Number of names is 3, number of types is 2.\n" +
+        "List of field names: [a, b, c].\n" +
+        "List of field types: [Integer, String].")
+
     val fieldNames = Array("a", "b", "c")
     val typeInfos: Array[TypeInformation[_]] = Array(
       BasicTypeInfo.INT_TYPE_INFO,
       BasicTypeInfo.STRING_TYPE_INFO)
     new TableSchema(fieldNames, typeInfos)
   }
+
+  @Test
+  def testColumnNamesDuplicate() {
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage(
+      "Field names must be unique.\n" +
+        "List of duplicate fields: [a].\n" +
+        "List of all fields: [a, a, c].")
+
+    val fieldNames = Array("a", "a", "c")
+    val typeInfos: Array[TypeInformation[_]] = Array(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO)
+    new TableSchema(fieldNames, typeInfos)
+  }
 }


[4/5] flink git commit: [FLINK-7564] [table] Fix watermark semantics in rowtime unbounded OVER window.

Posted by fh...@apache.org.
[FLINK-7564] [table] Fix watermark semantics in rowtime unbounded OVER window.

This closes #4633.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7b0d400
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7b0d400
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7b0d400

Branch: refs/heads/master
Commit: b7b0d400a8e630eb81d7ec51f112a4ded5c1b03f
Parents: 09344aa
Author: Xingcan Cui <xi...@gmail.com>
Authored: Fri Sep 1 09:16:21 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 5 13:53:59 2017 +0200

----------------------------------------------------------------------
 .../flink/table/runtime/aggregate/RowTimeUnboundedOver.scala   | 2 +-
 .../flink/table/runtime/harness/OverWindowHarnessTest.scala    | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7b0d400/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index c8236a3..27d307b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -114,7 +114,7 @@ abstract class RowTimeUnboundedOver(
     val curWatermark = ctx.timerService().currentWatermark()
 
     // discard late record
-    if (timestamp >= curWatermark) {
+    if (timestamp > curWatermark) {
       // ensure every key just registers one timer
       ctx.timerService.registerEventTimeTimer(curWatermark + 1)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b0d400/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index ba36e18..def1972 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -712,6 +712,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
+      CRow(Row.of(20000L: JLong, "ccc", 1L: JLong), change = true))) // test for late data
+
+    testHarness.processElement(new StreamRecord(
       CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
@@ -845,6 +848,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
+      CRow(Row.of(20000L: JLong, "ccc", 2L: JLong), change = true))) // test for late data
+
+    testHarness.processElement(new StreamRecord(
       CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(


[3/5] flink git commit: [FLINK-6751] [docs] Add documentation for user-defined AggregateFunction.

Posted by fh...@apache.org.
[FLINK-6751] [docs] Add documentation for user-defined AggregateFunction.

This closes #4546.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09344aa2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09344aa2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09344aa2

Branch: refs/heads/master
Commit: 09344aa2dc36b9b3ea4c5b7573ff532e26f9b0dd
Parents: ba03b78
Author: shaoxuan-wang <sh...@apache.org>
Authored: Wed Aug 16 00:00:12 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 5 13:53:59 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/udfs.md                          | 408 ++++++++++++++++++-
 docs/fig/udagg-mechanism.png                    | Bin 0 -> 201262 bytes
 .../table/functions/AggregateFunction.scala     |   2 +-
 3 files changed, 403 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09344aa2/docs/dev/table/udfs.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index 55f58b6..6c9bc1a 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -24,15 +24,18 @@ under the License.
 
 User-defined functions are an important feature, because they significantly extend the expressiveness of queries.
 
-**TODO**
-
 * This will be replaced by the TOC
 {:toc}
 
 Register User-Defined Functions
 -------------------------------
+In most cases, a user-defined function must be registered before it can be used in an query. It is not necessary to register functions for the Scala Table API. 
+
+Functions are registered at the `TableEnvironment` by calling a `registerFunction()` method. When a user-defined function is registered, it is inserted into the function catalog of the `TableEnvironment` such that the Table API or SQL parser can recognize and properly translate it. 
+
+Please find detailed examples of how to register and how to call each type of user-defined function 
+(`ScalarFunction`, `TableFunction`, and `AggregateFunction`) in the following sub-sessions.
 
-**TODO**
 
 {% top %}
 
@@ -97,8 +100,6 @@ tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
 
 By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
 
-Internally, the Table API and SQL code generation works with primitive values as much as possible. If a user-defined scalar function should not introduce much overhead through object creation/casting during runtime, it is recommended to declare parameters and result types as primitive types instead of their boxed classes. `Types.DATE` and `Types.TIME` can also be represented as `int`. `Types.TIMESTAMP` can be represented as `long`.
-
 The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
 
 <div class="codetabs" markdown="1">
@@ -264,10 +265,405 @@ class CustomTypeSplit extends TableFunction[Row] {
 
 {% top %}
 
+
 Aggregation Functions
 ---------------------
 
-**TODO**
+User-Defined Aggregate Functions (UDAGGs) aggregate a table (one ore more rows with one or more attributes) to a scalar value. 
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udagg-mechanism.png" width="80%">
+</center>
+
+The above figure shows an example of an aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, `id`, `name` and `price` and 5 rows. Imagine you need to find the highest price of all beverages in the table, i.e., perform a `max()` aggregation. You would need to check each of the 5 rows and the result would be a single numeric value.
+
+User-defined aggregation functions are implemented by extending the `AggregateFunction` class. An `AggregateFunction` works as follows. First, it needs an `accumulator`, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the `createAccumulator()` method of the `AggregateFunction`. Subsequently, the `accumulate()` method of the function is called for each input row to update the accumulator. Once all rows have been processed, the `getValue()` method of the function is called to compute and return the final result. 
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `createAccumulator()`
+- `accumulate()` 
+- `getValue()`
+
+Flinkā€™s type extraction facilities can fail to identify complex data types, e.g., if they are not basic types or simple POJOs. So similar to `ScalarFunction` and `TableFunction`, `AggregateFunction` provides methods to specify the `TypeInformation` of the result type (through 
+ `AggregateFunction#getResultType()`) and the type of the accumulator (through `AggregateFunction#getAccumulatorType()`).
+ 
+Besides the above methods, there are a few contracted methods that can be 
+optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the `merge()` method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that "connects" them). 
+
+**The following methods of `AggregateFunction` are required depending on the use case:**
+
+- `retract()` is required for aggregations on bounded `OVER` windows.
+- `merge()` is required for many batch aggreagtions and session window aggregations.
+- `resetAccumulator()` is required for many batch aggregations.
+
+All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a table function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods. 
+
+Detailed documentation for all methods of `AggregateFunction` is given below. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+  * Base class for aggregation functions. 
+  *
+  * @param <T>   the type of the aggregation result
+  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
+  *             aggregated values which are needed to compute an aggregation result.
+  *             AggregateFunction represents its state using accumulator, thereby the state of the
+  *             AggregateFunction must be put into the accumulator.
+  */
+public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
+
+  /**
+    * Creates and init the Accumulator for this [[AggregateFunction]].
+    *
+    * @return the accumulator with the initial value
+    */
+  public ACC createAccumulator(); // MANDATORY
+
+  /** Processes the input values and update the provided accumulator instance. The method
+    * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
+    * requires at least one accumulate() method.
+    *
+    * @param accumulator           the accumulator which contains the current aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
+    */
+  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
+
+  /**
+    * Retracts the input values from the accumulator instance. The current design assumes the
+    * inputs are the values that have been previously accumulated. The method retract can be
+    * overloaded with different custom types and arguments. This function must be implemented for
+    * datastream bounded over aggregate.
+    *
+    * @param accumulator           the accumulator which contains the current aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
+    */
+  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
+
+  /**
+    * Merges a group of accumulator instances into one accumulator instance. This function must be
+    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
+    *
+    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
+    *                     be noted that the accumulator may contain the previous aggregated
+    *                     results. Therefore user should not replace or clean this instance in the
+    *                     custom merge method.
+    * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
+    *                     merged.
+    */
+  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
+
+  /**
+    * Called every time when an aggregation result should be materialized.
+    * The returned value could be either an early and incomplete result
+    * (periodically emitted as data arrive) or the final result of the
+    * aggregation.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @return the aggregation result
+    */
+  public T getValue(ACC accumulator); // MANDATORY
+
+  /**
+    * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
+    * dataset grouping aggregate.
+    *
+    * @param accumulator  the accumulator which needs to be reset
+    */
+  public void resetAccumulator(ACC accumulator); // OPTIONAL
+
+  /**
+    * Returns true if this AggregateFunction can only be applied in an OVER window.
+    *
+    * @return true if the AggregateFunction requires an OVER window, false otherwise.
+    */
+  public Boolean requiresOver = false; // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the AggregateFunction's result.
+    *
+    * @return The TypeInformation of the AggregateFunction's result or null if the result type
+    *         should be automatically inferred.
+    */
+  public TypeInformation<T> getResultType = null; // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the AggregateFunction's accumulator.
+    *
+    * @return The TypeInformation of the AggregateFunction's accumulator or null if the
+    *         accumulator type should be automatically inferred.
+    */
+  public TypeInformation<T> getAccumulatorType = null; // PRE-DEFINED
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/**
+  * Base class for aggregation functions. 
+  *
+  * @tparam T   the type of the aggregation result
+  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
+  *             aggregated values which are needed to compute an aggregation result.
+  *             AggregateFunction represents its state using accumulator, thereby the state of the
+  *             AggregateFunction must be put into the accumulator.
+  */
+abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
+  /**
+    * Creates and init the Accumulator for this [[AggregateFunction]].
+    *
+    * @return the accumulator with the initial value
+    */
+  def createAccumulator(): ACC // MANDATORY
+
+  /**
+    * Processes the input values and update the provided accumulator instance. The method
+    * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
+    * requires at least one accumulate() method.
+    *
+    * @param accumulator           the accumulator which contains the current aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
+    */
+  def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY
+
+  /**
+    * Retracts the input values from the accumulator instance. The current design assumes the
+    * inputs are the values that have been previously accumulated. The method retract can be
+    * overloaded with different custom types and arguments. This function must be implemented for
+    * datastream bounded over aggregate.
+    *
+    * @param accumulator           the accumulator which contains the current aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
+    */
+  def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL
+
+  /**
+    * Merges a group of accumulator instances into one accumulator instance. This function must be
+    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
+    *
+    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
+    *                     be noted that the accumulator may contain the previous aggregated
+    *                     results. Therefore user should not replace or clean this instance in the
+    *                     custom merge method.
+    * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
+    *                     merged.
+    */
+  def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL
+  
+  /**
+    * Called every time when an aggregation result should be materialized.
+    * The returned value could be either an early and incomplete result
+    * (periodically emitted as data arrive) or the final result of the
+    * aggregation.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @return the aggregation result
+    */
+  def getValue(accumulator: ACC): T // MANDATORY
+
+  h/**
+    * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
+    * dataset grouping aggregate.
+    *
+    * @param accumulator  the accumulator which needs to be reset
+    */
+  def resetAccumulator(accumulator: ACC): Unit // OPTIONAL
+
+  /**
+    * Returns true if this AggregateFunction can only be applied in an OVER window.
+    *
+    * @return true if the AggregateFunction requires an OVER window, false otherwise.
+    */
+  def requiresOver: Boolean = false // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the AggregateFunction's result.
+    *
+    * @return The TypeInformation of the AggregateFunction's result or null if the result type
+    *         should be automatically inferred.
+    */
+  def getResultType: TypeInformation[T] = null // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the AggregateFunction's accumulator.
+    *
+    * @return The TypeInformation of the AggregateFunction's accumulator or null if the
+    *         accumulator type should be automatically inferred.
+    */
+  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
+}
+{% endhighlight %}
+</div>
+</div>
+
+
+The following example shows how to
+
+- define an `AggregateFunction` that calculates the weighted average on a given column, 
+- register the function in the `TableEnvironment`, and 
+- use the function in a query.  
+
+To calculate an weighted average value, the accumulator needs to store the weighted sum and count of all the data that has been accumulated. In our example we define a class `WeightedAvgAccum` to be the accumulator. Accumulators are automatically backup-ed by Flink's checkpointing mechanism and restored in case of a failure to ensure exactly-once semantics.
+
+The `accumulate()` method of our `WeightedAvg` `AggregateFunction` has three inputs. The first one is the `WeightedAvgAccum` accumulator, the other two are user-defined inputs: input value `ivalue` and weight of the input `iweight`. Although the `retract()`, `merge()`, and `resetAccumulator()` methods are not mandatory for most aggregation types, we provide them below as examples. Please note that we used Java primitive types and defined `getResultType()` and `getAccumulatorType()` methods in the Scala example because Flink type extraction does not work very well for Scala types.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * Accumulator for WeightedAvg.
+ */
+public static class WeightedAvgAccum {
+    public long sum = 0;
+    public int count = 0;
+}
+
+/**
+ * Weighted Average user-defined aggregate function.
+ */
+public static class WeightedAvg extends AggregateFunction<long, WeightedAvgAccum> {
+
+    @Override
+    public WeightedAvgAccum createAccumulator() {
+        return new WeightedAvgAccum();
+    }
+
+    @Override
+    public long getValue(WeightedAvgAccum acc) {
+        if (acc.count == 0) {
+            return null;
+        } else {
+            return acc.sum / acc.count;
+        }
+    }
+
+    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
+        acc.sum += iValue * iWeight;
+        acc.count += iWeight;
+    }
+
+    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
+        acc.sum -= iValue * iWeight;
+        acc.count -= iWeight;
+    }
+    
+    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
+        Iterator<WeightedAvgAccum> iter = it.iterator();
+        while (iter.hasNext()) {
+            WeightedAvgAccum a = iter.next();
+            acc.count += a.count;
+            acc.sum += a.sum;
+        }
+    }
+    
+    public void resetAccumulator(WeightedAvgAccum acc) {
+        acc.count = 0;
+        acc.sum = 0L;
+    }
+}
+
+// register function
+StreamTableEnvironment tEnv = ...
+tEnv.registerFunction("wAvg", new WeightedAvg());
+
+// use function
+tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.lang.{Long => JLong, Integer => JInteger}
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+ * Accumulator for WeightedAvg.
+ */
+class WeightedAvgAccum extends JTuple1[JLong, JInteger] {
+  sum = 0L
+  count = 0
+}
+
+/**
+ * Weighted Average user-defined aggregate function.
+ */
+class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {
+
+  override def createAccumulator(): WeightedAvgAccum = {
+    new WeightedAvgAccum
+  }
+  
+  override def getValue(acc: WeightedAvgAccum): JLong = {
+    if (acc.count == 0) {
+        null
+    } else {
+        acc.sum / acc.count
+    }
+  }
+  
+  def accumulate(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit = {
+    acc.sum += iValue * iWeight
+    acc.count += iWeight
+  }
+
+  def retract(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit = {
+    acc.sum -= iValue * iWeight
+    acc.count -= iWeight
+  }
+    
+  def merge(acc: WeightedAvgAccum, it: java.lang.Iterable[WeightedAvgAccum]): Unit = {
+    val iter = it.iterator()
+    while (iter.hasNext) {
+      val a = iter.next()
+      acc.count += a.count
+      acc.sum += a.sum
+    }
+  }
+
+  def resetAccumulator(acc: WeightedAvgAccum): Unit = {
+    acc.count = 0
+    acc.sum = 0L
+  }
+
+  override def getAccumulatorType: TypeInformation[WeightedAvgAccum] = {
+    new TupleTypeInfo(classOf[WeightedAvgAccum], 
+                      BasicTypeInfo.LONG_TYPE_INFO,
+                      BasicTypeInfo.INT_TYPE_INFO)
+  }
+
+  override def getResultType: TypeInformation[JLong] =
+    BasicTypeInfo.LONG_TYPE_INFO
+}
+
+// register function
+val tEnv: StreamTableEnvironment = ???
+tEnv.registerFunction("wAvg", new WeightedAvg())
+
+// use function
+tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+
+{% endhighlight %}
+</div>
+</div>
+
+
+{% top %}
+
+Best Practices for Implementing UDFs
+------------------------------------
+
+The Table API and SQL code generation internally tries to work with primitive values as much as possible. A user-defined function can introduce much overhead through object creation, casting, and (un)boxing. Therefore, it is highly recommended to declare parameters and result types as primitive types instead of their boxed classes. `Types.DATE` and `Types.TIME` can also be represented as `int`. `Types.TIMESTAMP` can be represented as `long`. 
+
+We recommended that user-defined functions should be written by Java instead of Scala as Scala types pose a challenge for Flink's type extractor.
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09344aa2/docs/fig/udagg-mechanism.png
----------------------------------------------------------------------
diff --git a/docs/fig/udagg-mechanism.png b/docs/fig/udagg-mechanism.png
new file mode 100644
index 0000000..043196f
Binary files /dev/null and b/docs/fig/udagg-mechanism.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/09344aa2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
index 8f50971..d3f9497 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
   *  - merge, and
   *  - resetAccumulator
   *
-  * All these methods muse be declared publicly, not static and named exactly as the names
+  * All these methods must be declared publicly, not static and named exactly as the names
   * mentioned above. The methods createAccumulator and getValue are defined in the
   * [[AggregateFunction]] functions, while other methods are explained below.
   *


[5/5] flink git commit: [FLINK-7227] [table] Fix push-down of disjunctive predicates with more than two terms.

Posted by fh...@apache.org.
[FLINK-7227] [table] Fix push-down of disjunctive predicates with more than two terms.

This closes #4608.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c11bd7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c11bd7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c11bd7f

Branch: refs/heads/master
Commit: 7c11bd7f4127bc550a8fd17fc0fd843b929e3cb2
Parents: 4cf737c
Author: Usman Younas <us...@Usmans-MBP.fritz.box>
Authored: Mon Aug 28 13:44:02 2017 +0000
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 5 14:21:49 2017 +0200

----------------------------------------------------------------------
 .../table/plan/util/RexProgramExtractor.scala   |  7 +-
 .../table/plan/RexProgramExtractorTest.scala    | 73 +++++++++++++++++---
 .../table/utils/TestFilterableTableSource.scala |  1 +
 3 files changed, 72 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c11bd7f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
index bf9a688..53bf8e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.plan.util
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
+import org.apache.flink.table.expressions.{And, Expression, Literal, Or, ResolvedFieldReference}
 import org.apache.flink.table.validate.FunctionCatalog
 import org.apache.flink.util.Preconditions
 
@@ -170,6 +171,10 @@ class RexNodeToExpressionConverter(
       None
     } else {
         call.getOperator match {
+          case SqlStdOperatorTable.OR =>
+            Option(operands.reduceLeft(Or))
+          case SqlStdOperatorTable.AND =>
+            Option(operands.reduceLeft(And))
           case function: SqlFunction =>
             lookupFunction(replace(function.getName), operands)
           case postfix: SqlPostfixOperator =>

http://git-wip-us.apache.org/repos/asf/flink/blob/7c11bd7f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index 840be17..c2a01c6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -20,12 +20,13 @@ package org.apache.flink.table.plan
 
 import java.math.BigDecimal
 
-import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlPostfixOperator
 import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, RexProgramExtractor}
 import org.apache.flink.table.utils.InputTypeBuilder.inputOf
 import org.apache.flink.table.validate.FunctionCatalog
 import org.hamcrest.CoreMatchers.is
@@ -33,6 +34,7 @@ import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat}
 import org.junit.Test
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 class RexProgramExtractorTest extends RexProgramTestBase {
 
@@ -104,6 +106,8 @@ class RexProgramExtractorTest extends RexProgramTestBase {
     val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3)
     // 100
     val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+    // 200
+    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(200L))
 
     // a = amount < 100
     val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3))
@@ -113,15 +117,17 @@ class RexProgramExtractorTest extends RexProgramTestBase {
     val c = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t2, t3))
     // d = amount <= id
     val d = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, t0, t1))
+    // e = price == 200
+    val e = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t2, t4))
 
     // a AND b
     val and = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(a, b).asJava))
-    // (a AND b) or c
-    val or = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.OR, List(and, c).asJava))
-    // not d
+    // (a AND b) OR c OR e
+    val or = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.OR, List(and, c, e).asJava))
+    // NOT d
     val not = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.NOT, List(d).asJava))
 
-    // (a AND b) OR c) AND (NOT d)
+    // (a AND b) OR c OR e) AND (NOT d)
     builder.addCondition(builder.addExpr(
       rexBuilder.makeCall(SqlStdOperatorTable.AND, List(or, not).asJava)))
 
@@ -134,14 +140,65 @@ class RexProgramExtractorTest extends RexProgramTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      ExpressionParser.parseExpression("amount < 100 || price == 100"),
-      ExpressionParser.parseExpression("id > 100 || price == 100"),
+      ExpressionParser.parseExpression("amount < 100 || price == 100 || price === 200"),
+      ExpressionParser.parseExpression("id > 100 || price == 100 || price === 200"),
       ExpressionParser.parseExpression("!(amount <= id)"))
     assertExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
 
   @Test
+  def testExtractANDExpressions(): Unit = {
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    // amount
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+    // id
+    val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+    // price
+    val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3)
+    // 100
+    val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+    // a = amount < 100
+    val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3))
+    // b = id > 100
+    val b = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t3))
+    // c = price == 100
+    val c = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t2, t3))
+    // d = amount <= id
+    val d = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, t0, t1))
+
+    // a AND b AND c AND d
+    val and = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(a, b, c, d).asJava))
+
+    builder.addCondition(builder.addExpr(and))
+
+    val program = builder.getProgram
+    val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+
+    val expanded = program.expandLocalRef(program.getCondition)
+
+    var convertedExpressions = new mutable.ArrayBuffer[Expression]
+    val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+    val inputNames = program.getInputRowType.getFieldNames.asScala.toArray
+    val converter = new RexNodeToExpressionConverter(inputNames, functionCatalog)
+
+    expanded.accept(converter) match {
+      case Some(expression) =>
+        convertedExpressions += expression
+      case None => unconvertedRexNodes += expanded
+    }
+
+    val expected: Array[Expression] = Array(
+      ExpressionParser.parseExpression("amount < 100 && id > 100 && price === 100 && amount <= id"))
+
+    assertExpressionArrayEquals(expected, convertedExpressions.toArray)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  @Test
   def testExtractArithmeticConditions(): Unit = {
     val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
     val builder = new RexProgramBuilder(inputRowType, rexBuilder)

http://git-wip-us.apache.org/repos/asf/flink/blob/7c11bd7f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
index dcf2acd..fb99864 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
@@ -89,6 +89,7 @@ class TestFilterableTableSource(
               iterator.remove()
             case (_, _) =>
           }
+        case _ =>
       }
     }