You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/04 21:32:23 UTC
[4/4] flink git commit: [FLINK-1159] [Scala API] Add API extension to
support case-style anonymous functions
[FLINK-1159] [Scala API] Add API extension to support case-style anonymous functions
This closes #1704
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cb84f18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cb84f18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cb84f18
Branch: refs/heads/master
Commit: 5cb84f185963fa89be5d0c4e83bad66bac44d84d
Parents: 3465580
Author: Stefano Baghino <st...@baghino.me>
Authored: Wed Feb 24 13:05:16 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 4 21:31:49 2016 +0200
----------------------------------------------------------------------
docs/apis/batch/index.md | 9 +
docs/apis/java8.md | 2 +-
docs/apis/scala_api_extensions.md | 409 +++++++++++++++++++
docs/apis/streaming/index.md | 10 +
.../OnCoGroupDataSet.scala | 51 +++
.../acceptPartialFunctions/OnCrossDataSet.scala | 48 +++
.../impl/acceptPartialFunctions/OnDataSet.scala | 118 ++++++
.../OnGroupedDataSet.scala | 87 ++++
.../OnHalfUnfinishedKeyPairOperation.scala | 46 +++
.../OnJoinFunctionAssigner.scala | 48 +++
.../OnUnfinishedKeyPairOperation.scala | 48 +++
.../flink/api/scala/extensions/package.scala | 95 +++++
.../extensions/base/AcceptPFTestBase.scala | 38 ++
.../scala/extensions/data/KeyValuePair.scala | 26 ++
.../OnCoGroupDataSetTest.scala | 57 +++
.../OnCrossDataSetTest.scala | 49 +++
.../acceptPartialFunctions/OnDataSetTest.scala | 171 ++++++++
.../OnGroupedDataSetTest.scala | 120 ++++++
.../OnHalfUnfinishedKeyPairOperationTest.scala | 150 +++++++
.../OnJoinFunctionAssignerTest.scala | 141 +++++++
.../OnUnfinishedKeyPairOperationTest.scala | 137 +++++++
.../streaming/api/scala/ConnectedStreams.scala | 5 +-
.../streaming/api/scala/JoinedStreams.scala | 2 +-
.../OnConnectedStream.scala | 79 ++++
.../acceptPartialFunctions/OnDataStream.scala | 78 ++++
.../acceptPartialFunctions/OnJoinedStream.scala | 50 +++
.../acceptPartialFunctions/OnKeyedStream.scala | 55 +++
.../OnWindowedStream.scala | 90 ++++
.../api/scala/extensions/package.scala | 89 ++++
.../extensions/base/AcceptPFTestBase.scala | 54 +++
.../scala/extensions/data/KeyValuePair.scala | 26 ++
.../OnConnectedDataStreamTest.scala | 102 +++++
.../OnDataStreamTest.scala | 109 +++++
.../OnJoinedDataStreamTest.scala | 67 +++
.../OnKeyedDataStreamTest.scala | 69 ++++
.../OnWindowedDataStreamTest.scala | 97 +++++
36 files changed, 2827 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/docs/apis/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/index.md b/docs/apis/batch/index.md
index 2a954d9..ff53219 100644
--- a/docs/apis/batch/index.md
+++ b/docs/apis/batch/index.md
@@ -729,6 +729,15 @@ val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
</tbody>
</table>
+Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:
+{% highlight scala %}
+val data: DataSet[(Int, String, Double)] = // [...]
+data.map {
+ case (id, name, temperature) => // [...]
+}
+{% endhighlight %}
+is not supported by the API out-of-the-box. To use this feature, you should use a <a href="../scala_api_extensions.html">Scala API extension</a>.
+
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/docs/apis/java8.md
----------------------------------------------------------------------
diff --git a/docs/apis/java8.md b/docs/apis/java8.md
index 53269e3..821038b 100644
--- a/docs/apis/java8.md
+++ b/docs/apis/java8.md
@@ -2,7 +2,7 @@
title: "Java 8 Programming Guide"
# Top-level navigation
top-nav-group: apis
-top-nav-pos: 11
+top-nav-pos: 12
top-nav-title: Java 8
---
<!--
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/docs/apis/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_api_extensions.md b/docs/apis/scala_api_extensions.md
new file mode 100644
index 0000000..e3268bf
--- /dev/null
+++ b/docs/apis/scala_api_extensions.md
@@ -0,0 +1,409 @@
+---
+title: "Scala API Extensions"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 11
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+In order to keep a fair amount of consistency between the Scala and Java APIs, some
+of the features that allow a high-level of expressiveness in Scala have been left
+out from the standard APIs for both batch and streaming.
+
+If you want to _enjoy the full Scala experience_ you can choose to opt-in to
+extensions that enhance the Scala API via implicit conversions.
+
+To use all the available extensions, you can just add a simple `import` for the
+DataSet API
+
+{% highlight scala %}
+import org.apache.flink.api.scala.extensions._
+{% endhighlight %}
+
+or the DataStream API
+
+{% highlight scala %}
+import org.apache.flink.streaming.api.scala.extensions._
+{% endhighlight %}
+
+Alternatively, you can import individual extensions _a-là-carte_ to only use those
+you prefer.
+
+## Accept partial functions
+
+Normally, both the DataSet and DataStream APIs don't accept anonymous pattern
+matching functions to deconstruct tuples, case classes or collections, like the
+following:
+
+{% highlight scala %}
+val data: DataSet[(Int, String, Double)] = // [...]
+data.map {
+ case (id, name, temperature) => // [...]
+ // The previous line causes the following compilation error:
+ // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
+}
+{% endhighlight %}
+
+This extension introduces new methods in both the DataSet and DataStream Scala API
+that have a one-to-one correspondance in the extended API. These delegating methods
+do support anonymous pattern matching functions.
+
+#### DataSet API
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Method</th>
+ <th class="text-left" style="width: 20%">Original</th>
+ <th class="text-center">Example</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>mapWith</strong></td>
+ <td><strong>map (DataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data.mapWith {
+ case (_, value) => value.toString
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>mapPartitionWith</strong></td>
+ <td><strong>mapPartition (DataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data.mapPartitionWith {
+ case head #:: _ => head
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>flatMapWith</strong></td>
+ <td><strong>flatMap (DataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data.flatMapWith {
+ case (_, name, visitTimes) => visitTimes.map(name -> _)
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>filterWith</strong></td>
+ <td><strong>filter (DataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data.filterWith {
+ case Train(_, isOnTime) => isOnTime
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>reduceWith</strong></td>
+ <td><strong>reduce (DataSet, GroupedDataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data.reduceWith {
+ case ((_, amount1), (_, amount2)) => amount1 + amount2
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>reduceGroupWith</strong></td>
+ <td><strong>reduceGroup (GroupedDataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data.reduceGroupWith {
+ case id #:: value #:: _ => id -> value
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>groupingBy</strong></td>
+ <td><strong>groupBy (DataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data.groupingBy {
+ case (id, _, _) => id
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>sortGroupWith</strong></td>
+ <td><strong>sortGroup (GroupedDataSet)</strong></td>
+ <td>
+{% highlight scala %}
+grouped.sortGroupWith(Order.ASCENDING) {
+ case House(_, value) => value
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>combineGroupWith</strong></td>
+ <td><strong>combineGroup (GroupedDataSet)</strong></td>
+ <td>
+{% highlight scala %}
+grouped.combineGroupWith {
+ case header #:: amounts => amounts.sum
+}
+{% endhighlight %}
+ </td>
+ <tr>
+ <td><strong>projecting</strong></td>
+ <td><strong>apply (JoinDataSet, CrossDataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data1.join(data2).
+ whereClause(case (pk, _) => pk).
+ isEqualTo(case (_, fk) => fk).
+ projecting {
+ case ((pk, tx), (products, fk)) => tx -> products
+ }
+
+data1.cross(data2).projecting {
+ case ((a, _), (_, b) => a -> b
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>projecting</strong></td>
+ <td><strong>apply (CoGroupDataSet)</strong></td>
+ <td>
+{% highlight scala %}
+data1.coGroup(data2).
+ whereClause(case (pk, _) => pk).
+ isEqualTo(case (_, fk) => fk).
+ projecting {
+ case (head1 #:: _, head2 #:: _) => head1 -> head2
+ }
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ </tr>
+ </tbody>
+</table>
+
+#### DataStream API
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Method</th>
+ <th class="text-left" style="width: 20%">Original</th>
+ <th class="text-center">Example</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>mapWith</strong></td>
+ <td><strong>map (DataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.mapWith {
+ case (_, value) => value.toString
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>mapPartitionWith</strong></td>
+ <td><strong>mapPartition (DataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.mapPartitionWith {
+ case head #:: _ => head
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>flatMapWith</strong></td>
+ <td><strong>flatMap (DataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.flatMapWith {
+ case (_, name, visits) => visits.map(name -> _)
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>filterWith</strong></td>
+ <td><strong>filter (DataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.filterWith {
+ case Train(_, isOnTime) => isOnTime
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>keyingBy</strong></td>
+ <td><strong>keyBy (DataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.keyingBy {
+ case (id, _, _) => id
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>mapWith</strong></td>
+ <td><strong>map (ConnectedDataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.mapWith(
+ map1 = case (_, value) => value.toString,
+ map2 = case (_, _, value, _) => value + 1
+)
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>flatMapWith</strong></td>
+ <td><strong>flatMap (ConnectedDataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.flatMapWith(
+ flatMap1 = case (_, json) => parse(json),
+ flatMap2 = case (_, _, json, _) => parse(json)
+)
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>keyingBy</strong></td>
+ <td><strong>keyBy (ConnectedDataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.keyingBy(
+ key1 = case (_, timestamp) => timestamp,
+ key2 = case (id, _, _) => id
+)
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>reduceWith</strong></td>
+ <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.reduceWith {
+ case ((_, sum1), (_, sum2) => sum1 + sum2
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>foldWith</strong></td>
+ <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.foldWith(User(bought = 0)) {
+ case (User(b), (_, items)) => User(b + items.size)
+}
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>applyWith</strong></td>
+ <td><strong>apply (WindowedDataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data.applyWith(0)(
+ foldFunction = case (sum, amount) => sum + amount
+ windowFunction = case (k, w, sum) => // [...]
+)
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>projecting</strong></td>
+ <td><strong>apply (JoinedDataStream)</strong></td>
+ <td>
+{% highlight scala %}
+data1.join(data2).
+ whereClause(case (pk, _) => pk).
+ isEqualTo(case (_, fk) => fk).
+ projecting {
+ case ((pk, tx), (products, fk)) => tx -> products
+ }
+{% endhighlight %}
+ </td>
+ </tr>
+ </tbody>
+</table>
+
+
+
+For more information on the semantics of each method, please refer to the
+[DataStream](batch/index.html) and [DataSet](streaming/index.html) API documentation.
+
+To use this extension exclusively, you can add the following `import`:
+
+{% highlight scala %}
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+{% endhighlight %}
+
+for the DataSet extensions and
+
+{% highlight scala %}
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+{% endhighlight %}
+
+The following snippet shows a minimal example of how to use these extension
+methods together (with the DataSet API):
+
+{% highlight scala %}
+object Main {
+ import org.apache.flink.api.scala.extensions._
+ case class Point(x: Double, y: Double)
+ def main(args: Array[String]): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+ ds.filterWith {
+ case Point(x, _) => x > 1
+ }.reduceWith {
+ case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
+ }.mapWith {
+ case Point(x, y) => (x, y)
+ }.flatMapWith {
+ case (x, y) => Seq("x" -> x, "y" -> y)
+ }.groupingBy {
+ case (id, value) => id
+ }
+ }
+}
+{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index 6d69459..a899844 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -873,6 +873,16 @@ stream.assignTimestamps { timestampExtractor }
</tbody>
</table>
+Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:
+{% highlight scala %}
+val data: DataStream[(Int, String, Double)] = // [...]
+data.map {
+ case (id, name, temperature) => // [...]
+}
+{% endhighlight %}
+is not supported by the API out-of-the-box. To use this feature, you should use a <a href="../scala_api_extensions.html">Scala API extension</a>.
+
+
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSet.scala
new file mode 100644
index 0000000..0337d44
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSet.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{CoGroupDataSet, DataSet}
+
+import scala.reflect.ClassTag
+
+/**
+ * Wraps a co-group data set, allowing to use anonymous partial functions to
+ * perform extraction of items in a tuple, case class instance or collection
+ *
+ * @param ds The wrapped co-group data set
+ * @tparam L The type of the left data set items
+ * @tparam R The type of the right data set items
+ */
+class OnCoGroupDataSet[L, R](ds: CoGroupDataSet[L, R]) {
+
+ /**
+ * Co-groups the data sets using the function `fun` to project elements from both in
+ * the resulting data set
+ *
+ * @param fun The function that defines the projection of the co-group operation
+ * @tparam O The return type of the projection, for which type information must be known
+ * @return A fully co-grouped data set of Os
+ */
+ @PublicEvolving
+ def projecting[O: TypeInformation: ClassTag](fun: (Stream[L], Stream[R]) => O): DataSet[O] =
+ ds {
+ (left, right) =>
+ fun(left.toStream, right.toStream)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSet.scala
new file mode 100644
index 0000000..a0d4ea1
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSet.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{CrossDataSet, DataSet}
+
+import scala.reflect.ClassTag
+
+/**
+ * Wraps a cross data set, allowing to use anonymous partial functions to
+ * perform extraction of items in a tuple, case class instance or collection
+ *
+ * @param ds The wrapped cross data set
+ * @tparam L The type of the left data set items
+ * @tparam R The type of the right data set items
+ */
+class OnCrossDataSet[L, R](ds: CrossDataSet[L, R]) {
+
+ /**
+ * Starting from a cross data set, uses the function `fun` to project elements from
+ * both the input data sets in the resulting data set
+ *
+ * @param fun The function that defines the projection of the join
+ * @tparam O The return type of the projection, for which type information must be known
+ * @return A data set of Os
+ */
+ @PublicEvolving
+ def projecting[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] =
+ ds(fun)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
new file mode 100644
index 0000000..b2521b0
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+
+import scala.reflect.ClassTag
+
+/**
+ * Wraps a data set, allowing to use anonymous partial functions to
+ * perform extraction of items in a tuple, case class instance or collection
+ *
+ * @param ds The wrapped data set
+ * @tparam T The type of the data set items
+ */
+class OnDataSet[T](ds: DataSet[T]) {
+
+ /**
+ * Applies a function `fun` to each item of the data set
+ *
+ * @param fun The function to be applied to each item
+ * @tparam R The type of the items in the returned data set
+ * @return A dataset of R
+ */
+ @PublicEvolving
+ def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
+ ds.map(fun)
+
+ /**
+ * Applies a function `fun` to a partition as a whole
+ *
+ * @param fun The function to be applied on the whole partition
+ * @tparam R The type of the items in the returned data set
+ * @return A dataset of R
+ */
+ @PublicEvolving
+ def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
+ ds.mapPartition {
+ (it, out) =>
+ out.collect(fun(it.toStream))
+ }
+
+ /**
+ * Applies a function `fun` to each item of the dataset, producing a collection of items
+ * that will be flattened in the resulting data set
+ *
+ * @param fun The function to be applied to each item
+ * @tparam R The type of the items in the returned data set
+ * @return A dataset of R
+ */
+ @PublicEvolving
+ def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R] =
+ ds.flatMap(fun)
+
+ /**
+ * Applies a predicate `fun` to each item of the data set, keeping only those for which
+ * the predicate holds
+ *
+ * @param fun The predicate to be tested on each item
+ * @return A dataset of R
+ */
+ @PublicEvolving
+ def filterWith(fun: T => Boolean): DataSet[T] =
+ ds.filter(fun)
+
+ /**
+ * Applies a reducer `fun` to the data set
+ *
+ * @param fun The reducing function to be applied on the whole data set
+ * @return A data set of Rs
+ */
+ @PublicEvolving
+ def reduceWith(fun: (T, T) => T): DataSet[T] =
+ ds.reduce(fun)
+
+ /**
+ * Applies a reducer `fun` to a grouped data set
+ *
+ * @param fun The function to be applied to the whole grouping
+ * @tparam R The type of the items in the returned data set
+ * @return A dataset of Rs
+ */
+ @PublicEvolving
+ def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
+ ds.reduceGroup {
+ (it, out) =>
+ out.collect(fun(it.toStream))
+ }
+
+ /**
+ * Groups the items according to a grouping function `fun`
+ *
+ * @param fun The grouping function
+ * @tparam K The return type of the grouping function, for which type information must be known
+ * @return A grouped data set of Ts
+ */
+ @PublicEvolving
+ def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T] =
+ ds.groupBy(fun)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
new file mode 100644
index 0000000..07abccb
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+
+import scala.reflect.ClassTag
+
+/**
+ * Wraps a grouped data set, allowing to use anonymous partial functions to
+ * perform extraction of items in a tuple, case class instance or collection
+ *
+ * @param ds The wrapped grouped data set
+ * @tparam T The type of the grouped data set items
+ */
+class OnGroupedDataSet[T](ds: GroupedDataSet[T]) {
+
+ /**
+ * Sorts a group using a sorting function `fun` and an `Order`
+ *
+ * @param fun The sorting function, defining the sorting key
+ * @param order The ordering strategy (ascending, descending, etc.)
+ * @tparam K The key type
+ * @return A data set sorted group-wise
+ */
+ @PublicEvolving
+ def sortGroupWith[K: TypeInformation](order: Order)(fun: T => K): GroupedDataSet[T] =
+ ds.sortGroup(fun, order)
+
+ /**
+ * Reduces the whole data set with a reducer `fun`
+ *
+ * @param fun The reducing function
+ * @return A reduced data set of Ts
+ */
+ @PublicEvolving
+ def reduceWith(fun: (T, T) => T): DataSet[T] =
+ ds.reduce(fun)
+
+ /**
+ * Reduces the data set group-wise with a reducer `fun`
+ *
+ * @param fun The reducing function
+ * @tparam R The type of the items in the resulting data set
+ * @return A data set of Rs reduced group-wise
+ */
+ @PublicEvolving
+ def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
+ ds.reduceGroup {
+ (it, out) =>
+ out.collect(fun(it.toStream))
+ }
+
+ /**
+ * Same as a reducing operation but only acts locally,
+ * ideal to perform pre-aggregation before a reduction.
+ *
+ * @param fun The reducing function
+ * @tparam R The type of the items in the resulting data set
+ * @return A data set of Rs reduced group-wise
+ */
+ @PublicEvolving
+ def combineGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
+ ds.combineGroup {
+ (it, out) =>
+ out.collect(fun(it.toStream))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperation.scala
new file mode 100644
index 0000000..a77c405
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperation.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.HalfUnfinishedKeyPairOperation
+/**
+ * Wraps an half unfinished key pair operation, allowing to use anonymous partial functions to
+ * perform extraction of items in a tuple, case class instance or collection
+ *
+ * @param ds The wrapped half unfinished key pair operation function assigner data set
+ * @tparam L The type of the left data set items
+ * @tparam R The type of the right data set items
+ * @tparam O The type of the output data set items
+ */
+class OnHalfUnfinishedKeyPairOperation[L, R, O](ds: HalfUnfinishedKeyPairOperation[L, R, O]) {
+
+ /**
+ * Initiates a join or co-group operation, defining the second half of
+ * the where clause with an equality over the right data set items.
+ *
+ * @param fun The function that defines the equality of the where clause
+ * @tparam K The type of the key, for which type information must be known
+ * @return A data set of Os
+ */
+ @PublicEvolving
+ def isEqualTo[K: TypeInformation](fun: R => K): O =
+ ds.equalTo(fun)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssigner.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssigner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssigner.scala
new file mode 100644
index 0000000..4ab41e5
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssigner.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{DataSet, JoinFunctionAssigner}
+
+import scala.reflect.ClassTag
+
+/**
+ * Wraps a join function assigner, allowing to use anonymous partial functions to
+ * perform extraction of items in a tuple, case class instance or collection
+ *
+ * @param ds The wrapped join function assigner data set
+ * @tparam L The type of the left data set items
+ * @tparam R The type of the right data set items
+ */
+class OnJoinFunctionAssigner[L, R](ds: JoinFunctionAssigner[L, R]) {
+
+ /**
+ * Joins the data sets using the function `fun` to project elements from both in the
+ * resulting data set
+ *
+ * @param fun The function that defines the projection of the join
+ * @tparam O The return type of the projection, for which type information must be known
+ * @return A fully joined data set of Os
+ */
+ @PublicEvolving
+ def projecting[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] =
+ ds(fun)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperation.scala
new file mode 100644
index 0000000..4fa6fcc
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperation.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{HalfUnfinishedKeyPairOperation, UnfinishedKeyPairOperation}
+
+/**
+ * Wraps an unfinished key pair operation, allowing to use anonymous partial functions to
+ * perform extraction of items in a tuple, case class instance or collection
+ *
+ * @param ds The wrapped unfinished key pair operation data set
+ * @tparam L The type of the left data set items
+ * @tparam R The type of the right data set items
+ * @tparam O The type of the output data set items
+ */
+class OnUnfinishedKeyPairOperation[L, R, O](ds: UnfinishedKeyPairOperation[L, R, O]) {
+
+ /**
+ * Initiates a join or co-group operation, defining the first half of
+ * the where clause with the items of the left data set that will be
+ * checked for equality with the ones provided by the second half.
+ *
+ * @param fun The function that defines the comparing item of the where clause
+ * @tparam K The type of the key, for which type information must be known
+ * @return A data set of Os
+ */
+ @PublicEvolving
+ def whereClause[K: TypeInformation](fun: (L) => K): HalfUnfinishedKeyPairOperation[L, R, O] =
+ ds.where(fun)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
new file mode 100644
index 0000000..7e5ab8a
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions._
+
+/**
+ * acceptPartialFunctions extends the original DataSet with methods with unique names
+ * that delegate to core higher-order functions (e.g. `map`) so that we can work around
+ * the fact that overloaded methods taking functions as parameters can't accept partial
+ * functions as well. This enables the possibility to directly apply pattern matching
+ * to decompose inputs such as tuples, case classes and collections.
+ *
+ * The following is a small example that showcases how this extensions would work on
+ * a Flink data set:
+ *
+ * {{{
+ * object Main {
+ * import org.apache.flink.api.scala.extensions._
+ * case class Point(x: Double, y: Double)
+ * def main(args: Array[String]): Unit = {
+ * val env = ExecutionEnvironment.getExecutionEnvironment
+ * val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+ * ds.filterWith {
+ * case Point(x, _) => x > 1
+ * }.reduceWith {
+ * case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
+ * }.mapWith {
+ * case Point(x, y) => (x, y)
+ * }.flatMapWith {
+ * case (x, y) => Seq('x' -> x, 'y' -> y)
+ * }.groupingBy {
+ * case (id, value) => id
+ * }
+ * }
+ * }
+ * }}}
+ *
+ * The extension consists of several implicit conversions over all the data set representations
+ * that could gain from this feature. To use this set of extensions methods the user has to
+ * explicitly opt-in by importing `org.apache.flink.api.scala.extensions.acceptPartialFunctions`.
+ *
+ * For more information and usage examples please consult the Apache Flink official documentation.
+ *
+ */
+package object extensions {
+
+ @PublicEvolving
+ implicit def acceptPartialFunctions[T](ds: DataSet[T]): OnDataSet[T] =
+ new OnDataSet[T](ds)
+
+ @PublicEvolving
+ implicit def acceptPartialFunctions[L, R](
+ ds: JoinFunctionAssigner[L, R]): OnJoinFunctionAssigner[L, R] =
+ new OnJoinFunctionAssigner[L, R](ds)
+
+ @PublicEvolving
+ implicit def acceptPartialFunctions[L, R](ds: CrossDataSet[L, R]): OnCrossDataSet[L, R] =
+ new OnCrossDataSet[L, R](ds)
+
+ @PublicEvolving
+ implicit def acceptPartialFunctions[T](ds: GroupedDataSet[T]): OnGroupedDataSet[T] =
+ new OnGroupedDataSet[T](ds)
+
+ @PublicEvolving
+ implicit def acceptPartialFunctions[L, R](ds: CoGroupDataSet[L, R]): OnCoGroupDataSet[L, R] =
+ new OnCoGroupDataSet[L, R](ds)
+
+ @PublicEvolving
+ implicit def acceptPartialFunctions[L, R, O](
+ ds: HalfUnfinishedKeyPairOperation[L, R, O]): OnHalfUnfinishedKeyPairOperation[L, R, O] =
+ new OnHalfUnfinishedKeyPairOperation[L, R, O](ds)
+
+ @PublicEvolving
+ implicit def acceptPartialFunctions[L, R, O](
+ ds: UnfinishedKeyPairOperation[L, R, O]): OnUnfinishedKeyPairOperation[L, R, O] =
+ new OnUnfinishedKeyPairOperation[L, R, O](ds)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
new file mode 100644
index 0000000..c2e13fe
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/base/AcceptPFTestBase.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.base
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.apache.flink.util.TestLogger
+import org.scalatest.junit.JUnitSuiteLike
+
+/**
+ * Common facilities to test the `acceptPartialFunctions` extension
+ */
+private[extensions] abstract class AcceptPFTestBase extends TestLogger with JUnitSuiteLike {
+
+ private val env = ExecutionEnvironment.getExecutionEnvironment
+
+ protected val tuples = env.fromElements(1 -> "hello", 2 -> "world")
+ protected val caseObjects = env.fromElements(KeyValuePair(1, "hello"), KeyValuePair(2, "world"))
+
+ protected val groupedTuples = tuples.groupBy(_._1)
+ protected val groupedCaseObjects = caseObjects.groupBy(_.id)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/data/KeyValuePair.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/data/KeyValuePair.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/data/KeyValuePair.scala
new file mode 100644
index 0000000..6d02393
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/data/KeyValuePair.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.data
+
+/**
+ * Simple case class to test the `acceptPartialFunctions` extension
+ *
+ * @param id A numerical identifier
+ * @param value A textual value
+ */
+private [extensions] case class KeyValuePair(id: Int, value: String)
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
new file mode 100644
index 0000000..a20f977
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCoGroupDataSetTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators.CoGroupOperator
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnCoGroupDataSetTest extends AcceptPFTestBase {
+
+ @Test
+ def testCoGroupProjectingOnTuple(): Unit = {
+ val test =
+ tuples.coGroup(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }.projecting {
+ case ((_, v1) #:: _, (_, v2) #:: _) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[CoGroupOperator[_, _, _]],
+ "projecting on tuples should produce a CoGroupOperator")
+ }
+
+ @Test
+ def testCoGroupProjectingOnCaseClass(): Unit = {
+ val test =
+ caseObjects.coGroup(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }.projecting {
+ case (KeyValuePair(_, v1) #:: _, KeyValuePair(_, v2) #:: _) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[CoGroupOperator[_, _, _]],
+ "projecting on case objects should produce a CoGroupOperator")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
new file mode 100644
index 0000000..650c9ab
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnCrossDataSetTest.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators.CrossOperator
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnCrossDataSetTest extends AcceptPFTestBase {
+
+ @Test
+ def testCrossProjectingOnTuple(): Unit = {
+ val test =
+ tuples.cross(tuples).projecting {
+ case ((_, v1), (_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[CrossOperator[_, _, _]],
+ "projecting for cross on tuples should produce a CrossOperator")
+ }
+
+ @Test
+ def testCrossProjectingOnCaseClass(): Unit = {
+ val test =
+ caseObjects.cross(caseObjects).projecting {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[CrossOperator[_, _, _]],
+ "projecting for cross on case objects should produce a CrossOperator")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
new file mode 100644
index 0000000..d4d6244
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSetTest.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnDataSetTest extends AcceptPFTestBase {
+
+ @Test
+ def testMapWithOnTuple(): Unit = {
+ val test =
+ tuples.mapWith {
+ case (id, value) => s"$id $value"
+ }
+ assert(test.javaSet.isInstanceOf[MapOperator[_, _]],
+ "mapWith should produce a MapOperator")
+ }
+
+ @Test
+ def testMapWithOnCaseClass(): Unit = {
+ val test =
+ caseObjects.mapWith {
+ case KeyValuePair(id, value) => s"$id $value"
+ }
+ assert(test.javaSet.isInstanceOf[MapOperator[_, _]],
+ "mapWith should produce a MapOperator")
+ }
+
+ @Test
+ def testMapPartitionWithOnTuple(): Unit = {
+ val test =
+ tuples.mapPartitionWith {
+ case (id, value) #:: _ => s"$id $value"
+ }
+ assert(test.javaSet.isInstanceOf[MapPartitionOperator[_, _]],
+ "mapPartitionWith should produce a MapPartitionOperator")
+ }
+
+ @Test
+ def testMapPartitionWithOnCaseClass(): Unit = {
+ val test =
+ caseObjects.mapPartitionWith {
+ case KeyValuePair(id, value) #:: _ => s"$id $value"
+ }
+ assert(test.javaSet.isInstanceOf[MapPartitionOperator[_, _]],
+ "mapPartitionWith should produce a MapPartitionOperator")
+ }
+
+ @Test
+ def testFlatMapWithOnTuple(): Unit = {
+ val test =
+ tuples.flatMapWith {
+ case (id, value) => List(id.toString, value)
+ }
+ assert(test.javaSet.isInstanceOf[FlatMapOperator[_, _]],
+ "flatMapWith should produce a FlatMapOperator")
+ }
+
+ @Test
+ def testFlatMapWithOnCaseClass(): Unit = {
+ val test =
+ caseObjects.flatMapWith {
+ case KeyValuePair(id, value) => List(id.toString, value)
+ }
+ assert(test.javaSet.isInstanceOf[FlatMapOperator[_, _]],
+ "flatMapWith should produce a FlatMapOperator")
+ }
+
+ @Test
+ def testFilterWithOnTuple(): Unit = {
+ val test =
+ tuples.filterWith {
+ case (id, value) => id == 1
+ }
+ assert(test.javaSet.isInstanceOf[FilterOperator[_]],
+ "filterWith should produce a FilterOperator")
+ }
+
+ @Test
+ def testFilterWithOnCaseClass(): Unit = {
+ val test =
+ caseObjects.filterWith {
+ case KeyValuePair(id, value) => id == 1
+ }
+ assert(test.javaSet.isInstanceOf[FilterOperator[_]],
+ "filterWith should produce a FilterOperator")
+ }
+
+ @Test
+ def testReduceWithOnTuple(): Unit = {
+ val test =
+ tuples.reduceWith {
+ case ((_, v1), (_, v2)) => (0, s"$v1 $v2")
+ }
+ assert(test.javaSet.isInstanceOf[ReduceOperator[_]],
+ "reduceWith should produce a ReduceOperator")
+ }
+
+ @Test
+ def testReduceWithOnCaseClass(): Unit = {
+ val test =
+ caseObjects.reduceWith {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
+ }
+ assert(test.javaSet.isInstanceOf[ReduceOperator[_]],
+ "reduceWith should produce a ReduceOperator")
+ }
+
+ @Test
+ def testReduceGroupWithOnTuple(): Unit = {
+ val accumulator: StringBuffer = new StringBuffer()
+ val test =
+ tuples.reduceGroupWith {
+ case (_, value) #:: _ => accumulator.append(value).append('\n')
+ }
+ assert(test.javaSet.isInstanceOf[GroupReduceOperator[_, _]],
+ "reduceGroupWith should produce a GroupReduceOperator")
+ }
+
+ @Test
+ def testReduceGroupWithOnCaseClass(): Unit = {
+ val accumulator: StringBuffer = new StringBuffer()
+ val test =
+ caseObjects.reduceGroupWith {
+ case KeyValuePair(_, value) #:: _ => accumulator.append(value).append('\n')
+ }
+ assert(test.javaSet.isInstanceOf[GroupReduceOperator[_, _]],
+ "reduceGroupWith should produce a GroupReduceOperator")
+ }
+
+ @Test
+ def testGroupingByOnTuple(): Unit = {
+ val test =
+ tuples.groupingBy {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[GroupedDataSet[_]],
+ "groupingBy should produce a GroupedDataSet")
+ }
+
+ @Test
+ def testGroupingByOnCaseClass(): Unit = {
+ val test =
+ caseObjects.groupingBy {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[GroupedDataSet[_]],
+ "groupingBy should produce a GroupedDataSet")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
new file mode 100644
index 0000000..898c4b0
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSetTest.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnGroupedDataSetTest extends AcceptPFTestBase {
+
+ @Test
+ def testSortGroupWithOnTuple(): Unit = {
+ val test =
+ groupedTuples.sortGroupWith(Order.ASCENDING) {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[GroupedDataSet[_]],
+ "sortGroupWith on tuples should produce a GroupedDataSet")
+ }
+
+ @Test
+ def testSortGroupWithOnCaseClass(): Unit = {
+ val test =
+ groupedCaseObjects.sortGroupWith(Order.ASCENDING) {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[GroupedDataSet[_]],
+ "sortGroupWith on case objects should produce a GroupedDataSet")
+ }
+
+ @Test
+ def testReduceWithOnTuple(): Unit = {
+ val test =
+ groupedTuples.reduceWith {
+ case ((_, v1), (_, v2)) => (0, s"$v1 $v2")
+ }
+
+ assert(test.javaSet.isInstanceOf[ReduceOperator[_]],
+ "reduceWith on tuples should produce a ReduceOperator")
+ }
+
+ @Test
+ def testReduceWithOnCaseClass(): Unit = {
+ val test =
+ groupedCaseObjects.reduceWith {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
+ }
+
+ assert(test.javaSet.isInstanceOf[ReduceOperator[_]],
+ "reduceWith on case objects should produce a ReduceOperator")
+ }
+
+ @Test
+ def testReduceGroupWithOnTuple(): Unit = {
+ val accumulator: StringBuffer = new StringBuffer()
+ val test =
+ groupedTuples.reduceGroupWith {
+ case (_, value) #:: _ => accumulator.append(value).append('\n')
+ }
+
+ assert(test.javaSet.isInstanceOf[GroupReduceOperator[_, _]],
+ "reduceGroupWith on tuples should produce a GroupReduceOperator")
+ }
+
+ @Test
+ def testReduceGroupWithOnCaseClass(): Unit = {
+ val accumulator: StringBuffer = new StringBuffer()
+ val test =
+ groupedCaseObjects.reduceGroupWith {
+ case KeyValuePair(_, value) #:: _ => accumulator.append(value).append('\n')
+ }
+
+ assert(test.javaSet.isInstanceOf[GroupReduceOperator[_, _]],
+ "reduceGroupWith on case objects should produce a GroupReduceOperator")
+ }
+
+ @Test
+ def testCombineGroupWithOnTuple(): Unit = {
+ val accumulator: StringBuffer = new StringBuffer()
+ val test =
+ groupedTuples.combineGroupWith {
+ case (_, value) #:: _ => accumulator.append(value).append('\n')
+ }
+
+ assert(test.javaSet.isInstanceOf[GroupCombineOperator[_, _]],
+ "combineGroupWith on tuples should produce a GroupCombineOperator")
+ }
+
+ @Test
+ def testCombineGroupWithOnCaseClass(): Unit = {
+ val accumulator: StringBuffer = new StringBuffer()
+ val test =
+ groupedCaseObjects.combineGroupWith {
+ case KeyValuePair(_, value) #:: _ => accumulator.append(value).append('\n')
+ }
+
+ assert(test.javaSet.isInstanceOf[GroupCombineOperator[_, _]],
+ "combineGroupWith on case objects should produce a GroupCombineOperator")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
new file mode 100644
index 0000000..dca2208
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnHalfUnfinishedKeyPairOperationTest.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators.CoGroupOperator
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnHalfUnfinishedKeyPairOperationTest extends AcceptPFTestBase {
+
+ @Test
+ def testInnerJoinIsEqualToOnTuple(): Unit = {
+ val test =
+ tuples.join(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "isEqualTo for inner join on tuples should produce a EquiJoin")
+ }
+
+ @Test
+ def testInnerJoinIsEqualToOnCaseClass(): Unit = {
+ val test =
+ caseObjects.join(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "isEqualTo for inner join on case objects should produce a EquiJoin")
+ }
+
+ @Test
+ def testRightOuterJoinIsEqualToOnTuple(): Unit = {
+ val test =
+ tuples.rightOuterJoin(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+ "isEqualTo for right outer join on tuples should produce a JoinFunctionAssigner")
+ }
+
+ @Test
+ def testRightOuterJoinIsEqualToOnCaseClass(): Unit = {
+ val test =
+ caseObjects.rightOuterJoin(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+ "isEqualTo for right outer join on case objects should produce a JoinFunctionAssigner")
+ }
+
+ @Test
+ def testLeftOuterJoinIsEqualToOnTuple(): Unit = {
+ val test =
+ tuples.leftOuterJoin(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+ "isEqualTo for left outer join on tuples should produce a JoinFunctionAssigner")
+ }
+
+ @Test
+ def testLeftOuterJoinIsEqualToOnCaseClass(): Unit = {
+ val test =
+ caseObjects.leftOuterJoin(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+ "isEqualTo for left outer join on case objects should produce a JoinFunctionAssigner")
+ }
+
+ @Test
+ def testFullOuterJoinIsEqualToOnTuple(): Unit = {
+ val test =
+ tuples.fullOuterJoin(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+ "isEqualTo for full outer join on tuples should produce a JoinFunctionAssigner")
+ }
+
+ @Test
+ def testFullOuterJoinIsEqualToOnCaseClass(): Unit = {
+ val test =
+ caseObjects.fullOuterJoin(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[JoinFunctionAssigner[_, _]],
+ "isEqualTo for full outer join on case objects should produce a JoinFunctionAssigner")
+ }
+
+ @Test
+ def testCoGroupIsEqualToOnTuple(): Unit = {
+ val test =
+ tuples.coGroup(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }
+ assert(test.javaSet.isInstanceOf[CoGroupOperator[_, _, _]],
+ "isEqualTo for co-group on tuples should produce a CoGroupOperator")
+ }
+
+ @Test
+ def testCoGroupIsEqualToOnCaseClass(): Unit = {
+ val test =
+ caseObjects.coGroup(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.javaSet.isInstanceOf[CoGroupOperator[_, _, _]],
+ "isEqualTo for co-group on case objects should produce a CoGroupOperator")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
new file mode 100644
index 0000000..52e31ae
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnJoinFunctionAssignerTest.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnJoinFunctionAssignerTest extends AcceptPFTestBase {
+
+ @Test
+ def testInnerJoinProjectingOnTuple(): Unit = {
+ val test =
+ tuples.join(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }.projecting {
+ case ((_, v1), (_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "projecting inner join on tuples should produce a EquiJoin")
+ }
+
+ @Test
+ def testInnerJoinProjectingOnCaseClass(): Unit = {
+ val test =
+ caseObjects.join(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }.projecting {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "projecting inner join on case objects should produce a EquiJoin")
+ }
+
+ @Test
+ def testRightOuterJoinProjectingOnTuple(): Unit = {
+ val test =
+ tuples.rightOuterJoin(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }.projecting {
+ case ((_, v1), (_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "projecting right outer join on tuples should produce a EquiJoin")
+ }
+
+ @Test
+ def testRightOuterJoinProjectingOnCaseClass(): Unit = {
+ val test =
+ caseObjects.rightOuterJoin(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }.projecting {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "projecting right outer join on case objects should produce a EquiJoin")
+ }
+
+ @Test
+ def testLeftOuterJoinProjectingOnTuple(): Unit = {
+ val test =
+ tuples.leftOuterJoin(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }.projecting {
+ case ((_, v1), (_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "projecting left outer join on tuples should produce a EquiJoin")
+ }
+
+ @Test
+ def testLeftOuterJoinProjectingOnCaseClass(): Unit = {
+ val test =
+ caseObjects.leftOuterJoin(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }.projecting {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "projecting left outer join on case objects should produce a EquiJoin")
+ }
+
+ @Test
+ def testFullOuterJoinProjectingOnTuple(): Unit = {
+ val test =
+ tuples.fullOuterJoin(tuples).whereClause {
+ case (id, _) => id
+ }.isEqualTo {
+ case (id, _) => id
+ }.projecting {
+ case ((_, v1), (_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "projecting full outer join on tuples should produce a EquiJoin")
+ }
+
+ @Test
+ def testFullOuterJoinProjectingOnCaseClass(): Unit = {
+ val test =
+ caseObjects.fullOuterJoin(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }.isEqualTo {
+ case KeyValuePair(id, _) => id
+ }.projecting {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaSet.isInstanceOf[EquiJoin[_, _, _]],
+ "projecting full outer join on case objects should produce a EquiJoin")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
new file mode 100644
index 0000000..b454699
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnUnfinishedKeyPairOperationTest.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnUnfinishedKeyPairOperationTest extends AcceptPFTestBase {
+
+ @Test
+ def testInnerJoinWhereClauseOnTuple(): Unit = {
+ val test =
+ tuples.join(tuples).whereClause {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for inner join on tuples should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testInnerJoinWhereClauseOnCaseClass(): Unit = {
+ val test =
+ caseObjects.join(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for inner join on case objects " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testRightOuterJoinWhereClauseOnTuple(): Unit = {
+ val test =
+ tuples.join(tuples).whereClause {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for right outer join on tuples " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testRightOuterJoinWhereClauseOnCaseClass(): Unit = {
+ val test =
+ caseObjects.join(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for right outer join on case objects " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testLeftOuterJoinWhereClauseOnTuple(): Unit = {
+ val test =
+ tuples.join(tuples).whereClause {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for left outer join on tuples " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testLeftOuterJoinWhereClauseOnCaseClass(): Unit = {
+ val test =
+ caseObjects.join(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for left outer join on case objects " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testFullOuterJoinWhereClauseOnTuple(): Unit = {
+ val test =
+ tuples.fullOuterJoin(tuples).whereClause {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for full outer join on tuples " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testFullOuterJoinWhereClauseOnCaseClass(): Unit = {
+ val test =
+ caseObjects.fullOuterJoin(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for full outer join on case objects " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testCoGroupWhereClauseOnTuple(): Unit = {
+ val test =
+ tuples.coGroup(tuples).whereClause {
+ case (id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for co-group on tuples " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+ @Test
+ def testCoGroupWhereClauseOnCaseClass(): Unit = {
+ val test =
+ caseObjects.coGroup(caseObjects).whereClause {
+ case KeyValuePair(id, _) => id
+ }
+ assert(test.isInstanceOf[HalfUnfinishedKeyPairOperation[_, _, _]],
+ "whereClause for co-group on case objects " +
+ "should produce a HalfUnfinishedKeyPairOperation")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 669f12e..141625e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -18,14 +18,13 @@
package org.apache.flink.streaming.api.scala
-import org.apache.flink.annotation.{PublicEvolving, Internal, Public}
+import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, SingleOutputStreamOperator, KeyedStream}
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
-import org.apache.flink.streaming.api.transformations.TwoInputTransformation
import org.apache.flink.util.Collector
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index 4d09dae..93b5cc8 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -73,7 +73,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
* A join operation that has a [[KeySelector]] defined for the first input.
*
* You need to specify a [[KeySelector]] for the second input using [[equalTo()]]
- * before you can proceeed with specifying a [[WindowAssigner]] using [[EqualTo.window()]].
+ * before you can proceed with specifying a [[WindowAssigner]] using [[EqualTo.window()]].
*
* @tparam KEY Type of the key. This must be the same for both inputs
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
new file mode 100644
index 0000000..deb03a3
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream}
+
+/**
+ * Wraps a connected data stream, allowing to use anonymous partial functions to
+ * perform extraction of items in a tuple, case class instance or collection
+ *
+ * @param stream The wrapped data stream
+ * @tparam IN1 The type of the data stream items coming from the first connection
+ * @tparam IN2 The type of the data stream items coming from the second connection
+ */
+class OnConnectedStream[IN1, IN2](stream: ConnectedStreams[IN1, IN2]) {
+
+ /**
+ * Applies a CoMap transformation on the connected streams.
+ *
+ * The transformation consists of two separate functions, where
+ * the first one is called for each element of the first connected stream,
+ * and the second one is called for each element of the second connected stream.
+ *
+ * @param map1 Function called per element of the first input.
+ * @param map2 Function called per element of the second input.
+ * @return The resulting data stream.
+ */
+ @PublicEvolving
+ def mapWith[R: TypeInformation](map1: IN1 => R, map2: IN2 => R): DataStream[R] =
+ stream.map(map1, map2)
+
+ /**
+ * Applies a CoFlatMap transformation on the connected streams.
+ *
+ * The transformation consists of two separate functions, where
+ * the first one is called for each element of the first connected stream,
+ * and the second one is called for each element of the second connected stream.
+ *
+ * @param flatMap1 Function called per element of the first input.
+ * @param flatMap2 Function called per element of the second input.
+ * @return The resulting data stream.
+ */
+ @PublicEvolving
+ def flatMapWith[R: TypeInformation](
+ flatMap1: IN1 => TraversableOnce[R], flatMap2: IN2 => TraversableOnce[R]): DataStream[R] =
+ stream.flatMap(flatMap1, flatMap2)
+
+ /**
+ * Keys the two connected streams together. After this operation, all
+ * elements with the same key from both streams will be sent to the
+ * same parallel instance of the transformation functions.
+ *
+ * @param key1 The first stream's key function
+ * @param key2 The second stream's key function
+ * @return The key-grouped connected streams
+ */
+ @PublicEvolving
+ def keyingBy[K1: TypeInformation, K2: TypeInformation](key1: IN1 => K1, key2: IN2 => K2):
+ ConnectedStreams[IN1, IN2] =
+ stream.keyBy(key1, key2)
+
+}