You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by stefanobaghino <gi...@git.apache.org> on 2016/02/24 13:08:25 UTC

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

GitHub user stefanobaghino opened a pull request:

    https://github.com/apache/flink/pull/1704

    [FLINK-1159] Case style anonymous functions not supported by Scala API

    The proposed API extension methods would allow developers to pass a pattern matching anonymous function so that they are applied on a `DataSet` or `DataStream`; many methods defined on the `DataSet` and `DataStream` APIs don't support those functions due to the overloading[[1]][[2]]; pattern matching anonymous functions allow a very idiomatic approach in Scala to *decompose tuples, case classes and collections*.
    
    The PR does not pollute the original `DataSet` and `DataStream` APIs but is provided as an optional set of extensions methods, implemented via implicit conversions and made available to the developer by explicitly importing the required package, e.g.:
    
    ```scala
    import org.apache.flink.api.scala.extensions.acceptPartialFunctions
    env.fromElements('a -> 1, 'b -> 2).mapWith {
      case (key, value) =>
        ... // key and value are now available and have a sensible name
    }
    ```
    
    [1]: https://groups.google.com/d/msg/scala-user/3oHnDEl1UsM/dDNir9BsiG4J
    [2]: http://www.scala-lang.org/files/archive/spec/2.11/06-expressions.html#overloading-resolution

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/radicalbit/flink 1159

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1704.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1704
    
----
commit 61d184075cce9fa4f3a8e477634adef083d3a070
Author: Stefano Baghino <st...@baghino.me>
Date:   2016-02-24T12:05:16Z

    [FLINK-1159] Case style anonymous functions not supported by Scala API

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-201325574
  
    @tillrohrmann I moved forward on the batch extensions: I added support for anonymous partial functions on `where` and `equalsTo` for joins and co-group operations. I also deleted the old tests and provided a full set of unit tests for each method under the same source root (`flink-scala`).
    
    Now I have to work on the streaming and adjust the Scaladoc and documentation. I hope next week I'll have something ready to close this PR. Let me know if you prefer to have the work done so far pushed on this PR already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53945929
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataSet with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    --- End diff --
    
    This won't work. You have to import `org.apache.flink.api.scala.extensions.acceptPartialFunctions._`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-192259307
  
    @tillrohrmann Yes, I've already fixed them locally, thanks for the notice and for pinging me. I'm working on the documentation, sorry if it's taking me a little bit longer than expected but I had little time this week. I want to complete the PR by Sunday evening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-191799798
  
    There are scalastyle violations in the code:
    
    ```
    error file=/home/travis/build/apache/flink/flink-tests/src/test/scala/org/apache/flink/api/scala/extensions/AcceptPFFilterITCase.scala message=File must end with newline character
    error file=/home/travis/build/apache/flink/flink-tests/src/test/scala/org/apache/flink/api/scala/extensions/AcceptPFFlatMapITCase.scala message=File must end with newline character
    error file=/home/travis/build/apache/flink/flink-tests/src/test/scala/org/apache/flink/api/scala/extensions/AcceptPFMapITCase.scala message=File must end with newline character
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188343795
  
    :+1: @StephanEwen I started working on the extension before the removal, will fix this as well, thanks for the feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53946030
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataSet with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = ExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    --- End diff --
    
    Sorry forget my comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-196314763
  
    @stefanobaghino, really good work :-) I think we're close to get this merged. I had some minor comments.
    
    Concerning the testing, I agree with @StephanEwen that it's not really necessary to execute a complete program for each new extension method. Instead, I think it is sufficient to check that the right operator/stream transformation has been instantiated. The operators and stream transformations should be already well tested. For the `DataSet` methods you could do something like
    
    ```
    val identityMapDs = ds.mapWith(identity)
    assertTrue(identityMapDs.javaSet.isInstanceOf[MapOperator[String, String]])
    ```
    
    And for the `DataStream` methods
    
    ```
    val identity = stream.mapWith{case x => x}
    assertTrue(identity.javaStream.getTransformation.asInstanceOf[OneInputTransformation[Int, Int]]
          .getOperator.isInstanceOf[StreamMap[Int, Int]])
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188301093
  
    Thanks @tillrohrmann, I'll fix the errors in the comments and add the missing methods and extensions.
    
    Regarding the import mode, I agree with you. I started off with just one `DataSet` but then had to support many, I'll try to go back to the original design while retaining multiple ´DataSet´ subtype extensions.
    
    The only thing I'm not very convinced of from your snippet is about returning the extended `DataStreamWithPartialFunctionSupport` class. Implicit conversions have no runtime cost, so maybe it would be better to just return the `DataSet` to make the code more compact and readable. What do you think?
    
    Regarding the tests: I actually have some tests in place locally but basically they're just copies of the original tests on the operators. Should I commit them as well?
    
    Regarding the docs: absolutely, I wasn't sure about this. Where do you think it would be better to put them? Should I add a new chapter under the programming guides?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55995289
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala ---
    @@ -0,0 +1,202 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions._
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +package object extensions {
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataStream with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.keyingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnDataStream[T: TypeInformation](ds: DataStream[T]):
    +      OnDataStream[T] =
    +    new OnDataStream[T](ds)
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataStream with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.keyingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnKeyedStream[T: TypeInformation, K](ds: KeyedStream[T, K]):
    +      OnKeyedStream[T, K] =
    +    new OnKeyedStream[T, K](ds)
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataStream with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.keyingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnJoinedStream
    +      [L: TypeInformation, R: TypeInformation, K, W <: Window](
    +      ds: JoinedStreams[L, R]#Where[K]#EqualTo#WithWindow[W]) =
    +    new OnJoinedStream[L, R, K, W](ds)
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataStream with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.keyingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnConnectedStream[IN1: TypeInformation, IN2: TypeInformation](
    +      ds: ConnectedStreams[IN1, IN2]) =
    +    new OnConnectedStream[IN1, IN2](ds)
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataStream with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.keyingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnWindowedStream[T, K, W <: Window](
    +      ds: WindowedStream[T, K, W]) =
    +    new OnWindowedStream[T, K, W](ds)
    +
    --- End diff --
    
    The same comments as for the extensions for the `DataSet` apply here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55995777
  
    --- Diff: docs/apis/scala_api_extensions.md ---
    @@ -0,0 +1,392 @@
    +---
    +title: "Scala API Extensions"
    +# Top-level navigation
    +top-nav-group: apis
    +top-nav-pos: 11
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +In order to keep a fair amount of consistency between the Scala and Java APIs, some 
    +of the features that allow a high-level of expressiveness in Scala have been left
    +out from the standard APIs for both batch and streaming.
    +
    +If you want to _enjoy the full Scala experience_ you can choose to opt-in to 
    +extensions that enhance the Scala API via implicit conversions.
    +
    +To use all the available extensions, you can just add a simple `import` for the
    +DataSet API
    +
    +{% highlight scala %}
    +import org.apache.flink.api.scala.extensions._
    +{% endhighlight %}
    +
    +or the DataStream API
    +
    +{% highlight scala %}
    +import org.apache.flink.streaming.api.scala.extensions._
    +{% endhighlight %}
    +
    +Alternatively, you can import individual extensions _a-là-carte_ to only use those
    +you prefer.
    +
    +## Accept partial functions
    +
    +Normally, both the DataSet and DataStream APIs don't accept anonymous pattern
    +matching functions to deconstruct tuples, case classes or collections, like the
    +following:
    +
    +{% highlight scala %}
    +val data: DataSet[(Int, String, Double)] = // [...]
    +data.map {
    +  case (id, name, temperature) => // [...]
    +  // The previous line causes the following compilation error:
    +  // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
    +}
    +{% endhighlight %}
    +
    +This extension introduces new methods in both the DataSet and DataStream Scala API
    +that have a one-to-one correspondance in the extended API. These delegating methods 
    +do support anonymous pattern matching functions.
    +
    +#### DataSet API
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Method</th>
    +      <th class="text-left" style="width: 20%">Original</th>
    +      <th class="text-center">Example</th>
    +    </tr>
    +  </thead>
    +
    +  <tbody>
    +    <tr>
    +      <td><strong>mapWith</strong></td>
    +      <td><strong>map (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapWith {
    +  case (_, value) => value.toString
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>mapPartitionWith</strong></td>
    +      <td><strong>mapPartition (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapPartitionWith {
    +  case head +: _ => head
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>flatMapWith</strong></td>
    +      <td><strong>flatMap (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.flatMapWith {
    +  case (_, name, visitTimes) => visitTimes.map(name -> _)
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>filterWith</strong></td>
    +      <td><strong>filter (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.filterWith {
    +  case Train(_, isOnTime) => isOnTime
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>reduceWith</strong></td>
    +      <td><strong>reduce (DataSet, GroupedDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.reduceWith {
    +  case ((_, amount1), (_, amount2)) => amount1 + amount2
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>reduceGroupWith</strong></td>
    +      <td><strong>reduceGroup (GroupedDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.reduceGroupWith {
    +  case id +: value +: _ => id -> value
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>groupingBy</strong></td>
    +      <td><strong>groupBy (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.groupingBy {
    +  case (id, _, _) => id
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>sortGroupWith</strong></td>
    +      <td><strong>sortGroup (GroupedDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +grouped.sortGroupWith(Order.ASCENDING) {
    +  case House(_, value) => value
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>combineGroupWith</strong></td>
    +      <td><strong>combineGroup (GroupedDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +grouped.combineGroupWith {
    +  case header +: amounts => amounts.sum
    +}
    +{% endhighlight %}
    +      </td>
    +    <tr>
    +      <td><strong>projecting</strong></td>
    +      <td><strong>apply (JoinDataSet, CrossDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data1.join(data2).where(0).equalTo(1).projecting {
    +  case ((pk, tx), (products, fk)) => tx -> products
    +}
    +
    +data1.cross(data2).projecting {
    +  case ((a, _), (_, b) => a -> b
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>projecting</strong></td>
    +      <td><strong>apply (CoGroupDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data1.coGroup(data2).where(0).equalTo(1).projecting {
    +  case (head1 +: _, head2 +: _) => head1 -> head2
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    </tr>
    +  </tbody>
    +</table>
    +
    +#### DataStream API
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Method</th>
    +      <th class="text-left" style="width: 20%">Original</th>
    +      <th class="text-center">Example</th>
    +    </tr>
    +  </thead>
    +
    +  <tbody>
    +    <tr>
    +      <td><strong>mapWith</strong></td>
    +      <td><strong>map (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapWith {
    +  case (_, value) => value.toString
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>mapPartitionWith</strong></td>
    +      <td><strong>mapPartition (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapPartitionWith {
    +  case head +: _ => head
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>flatMapWith</strong></td>
    +      <td><strong>flatMap (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.flatMapWith {
    +  case (_, name, visits) => visits.map(name -> _)
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>filterWith</strong></td>
    +      <td><strong>filter (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.filterWith {
    +  case Train(_, isOnTime) => isOnTime
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>keyingBy</strong></td>
    +      <td><strong>keyBy (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.keyingBy {
    +  case (id, _, _) => id
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>mapWith</strong></td>
    +      <td><strong>map (ConnectedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapWith(
    +  map1 = case (_, value) => value.toString,
    +  map2 = case (_, _, value, _) => value + 1
    +)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>flatMapWith</strong></td>
    +      <td><strong>flatMap (ConnectedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.flatMapWith(
    +  flatMap1 = case (_, json) => parse(json),
    +  flatMap2 = case (_, _, json, _) => parse(json)
    +)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>keyingBy</strong></td>
    +      <td><strong>keyBy (ConnectedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.keyingBy(
    +  key1 = case (_, timestamp) => timestamp,
    +  key2 = case (id, _, _) => id
    +)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>reduceWith</strong></td>
    +      <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.reduceWith {
    +  case ((_, sum1), (_, sum2) => sum1 + sum2
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>foldWith</strong></td>
    +      <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.foldWith(User(bought = 0)) {
    +  case (User(b), (_, items)) => User(b + items.size)
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>applyWith</strong></td>
    +      <td><strong>apply (WindowedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.applyWith(0)(
    +  foldFunction = case (sum, amount) => sum + amount
    +  windowFunction = case (k, w, sum) => // [...]
    +)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>projecting</strong></td>
    +      <td><strong>apply (JoinedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data1.join(data2).where(0).equalTo(1).projecting {
    +  case ((pk, tx), (products, fk)) => tx -> products
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +  </tbody>
    +</table>
    +
    +
    +
    +For more information on the semantics of each method, please refer to the 
    +[DataStream](batch/index.html) and [DataSet](streaming/index.html) API documentation.
    +
    +To use this extension exclusively, you can add the following `import`:
    +
    +{% highlight scala %}
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions
    --- End diff --
    
    Yes, I wrote the docs before testing and rewriting the method signatures; good catch, thanks. I'll try to find a way to make a single import for all `acceptPartialFunctions` methods (see my reply to the next comment).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55993670
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/OnGroupedDataSet.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
    +
    +import scala.reflect.ClassTag
    +
    +class OnGroupedDataSet[T: ClassTag](ds: GroupedDataSet[T]) {
    +
    +  /**
    +    * Sorts a group using a sorting function `fun` and an `Order`
    +    *
    +    * @param fun The sorting function, defining the sorting key
    +    * @param order The ordering strategy (ascending, descending, etc.)
    +    * @tparam K The key type
    +    * @return A data set sorted group-wise
    +    */
    +  def sortGroupWith[K: TypeInformation](order: Order)(fun: T => K): GroupedDataSet[T] =
    +    ds.sortGroup(fun, order)
    +
    +  /**
    +    * Reduces the whole data set with a reducer `fun`
    +    *
    +    * @param fun The reducing function
    +    * @return A reduced data set of Ts
    +    */
    +  def reduceWith(fun: (T, T) => T): DataSet[T] =
    +    ds.reduce(fun)
    +
    +  /**
    +    * Reduces the data set group-wise with a reducer `fun`
    +    *
    +    * @param fun The reducing function
    +    * @tparam R The type of the items in the resulting data set
    +    * @return A data set of Rs reduced group-wise
    +    */
    +  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.reduceGroup {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    +    }
    +
    +  /**
    +    * Same as a reducing operation but only acts locally,
    +    * ideal to perform pre-aggregation before a reduction.
    +    *
    +    * @param fun The reducing function
    +    * @tparam R The type of the items in the resulting data set
    +    * @return A data set of Rs reduced group-wise
    +    */
    +  def combineGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.combineGroup {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    --- End diff --
    
    Materialization?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53945878
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataSet with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = ExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    --- End diff --
    
    Isn't it called `keyingBy`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-202778390
  
    Sounds great @stefanobaghino. I think you can push your work to this PR as well since it is all related to the partial function support. Looking forward having partial function support :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r58244518
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Wraps a data set, allowing to use anonymous partial functions to
    +  * perform extraction of items in a tuple, case class instance or collection
    +  *
    +  * @param ds The wrapped data set
    +  * @tparam T The type of the data set items, for which the type information must be known
    +  */
    +class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    --- End diff --
    
    I think you do not need the `TypeInformation` context bound here. Functions should never need the implicit type info for the input (that is contained in the DataSet already), only ever for the return type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-195818369
  
    Thanks for the feedback. Basically the tests so far have been a carbon copy of the ones run on the operators I delegate to from the extensions. I agree that this approach is a little bit heavy and would like to make it lighter but perhaps I can use some help on this.
    
    I've made a few changes that perhaps go in the right direction in my latest commit ([source here](https://github.com/radicalbit/flink/blob/3e6c58c84c977299d05ef9c9675ecda31368731a/flink-tests/src/test/scala/org/apache/flink/api/scala/extensions/AcceptPFMapITCase.scala)).
    
    Here's a snippet:
    
    ```scala
    class AcceptPFMapITCase extends TestLogger {
    
      private val optimizer = new Optimizer(new Configuration)
      private val unusedResultPath = "UNUSED"
    
      def isTransformation(n: PlanNode): Boolean =
        !n.isInstanceOf[SourcePlanNode] && !n.isInstanceOf[SinkPlanNode]
    
      private def getOptimizerTransformationNodes(env: ExecutionEnvironment): Iterable[OptimizerNode] =
        for {
          node <- optimizer.compile(env.createProgramPlan("UNUSED")).getAllNodes
          transformation = node.getOptimizerNode if isTransformation(node)
        } yield transformation
    
      @Test
      def testIdentityMapperWithBasicType(): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val ds = CollectionDataSets.getStringDataSet(env)
        val identityMapDs = ds.mapWith(identity)
        identityMapDs.writeAsText(unusedResultPath, WriteMode.OVERWRITE)
        val nodes = getOptimizerTransformationNodes(env)
        assertTrue("The plan should contain 1 transformation", nodes.size == 1)
        assertTrue("The transformation should be a map", nodes.forall(_.isInstanceOf[MapNode]))
      }
    
      /* [...] */
    ```
    
    If this approach sounds fine, I can apply it to the other tests as well, along with a wider coverage of the extension methods.
    
    Apart from the approach in general, I have a couple of doubts:
    * is it safe to share an `Optimizer` among several tests (i.e. no state involved)?
    * is it enough to test the transformation or should I test the input and output types as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188335368
  
    Looks very nice in my opinion.
    
    Could you remove the `ClassTag` context bounds? We recently removed them from `DataStream`, because they are not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-204500365
  
    Looks pretty good. I noticed three remaining things:
    
      - Sometimes, the package and the directory trees are different (example: `OnCoGroupDataSetTest.scala`). While Scala allows that, we usually keep them in sync.
    
      - The classes that implement the "withX" functions sometimes have context bounds that I think they should not require (see inline comment above). Would be great to remove them, because removing them later makes it API breaking.
    
      - We need to decide how to label this for the future: stable, or evolving. I would suggest to use `@PublicEvolving` on all the involved classes for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53948074
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataSet with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = ExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    +  *         case (id, value) => id
    +  *       }
    +  *     }
    +  *   }
    +  * }}}
    +  *
    +  */
    +package object acceptPartialFunctions {
    +
    +  implicit class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +    /**
    +      * Applies a function `fun` to each item of the data set
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +      ds.map(fun)
    +
    +    /**
    +      * Applies a function `fun` to a partition as a whole
    +      *
    +      * @param fun The function to be applied on the whole partition
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +      ds.mapPartition {
    +        (it, out) =>
    +          out.collect(fun(it.to[Seq]))
    +      }
    +
    +    /**
    +      * Applies a function `fun` to each item of the dataset, producing a collection of items
    +      * that will be flattened in the resulting data set
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R] =
    +      ds.flatMap(fun)
    +
    +    /**
    +      * Applies a predicate `fun` to each item of the data set, keeping only those for which
    +      * the predicate holds
    +      *
    +      * @param fun The predicate to be tested on each item
    +      * @return A dataset of R
    +      */
    +    def filterWith(fun: T => Boolean): DataSet[T] =
    +      ds.filter(fun)
    +
    +    /**
    +      * Applies a reducer `fun` to the data set
    +      *
    +      * @param fun The reducing function to be applied on the whole data set
    +      * @tparam R The type of the items in the returned collection
    +      * @return A data set of Rs
    +      */
    +    def reduceWith[R: TypeInformation: ClassTag](fun: (T, T) => T): DataSet[T] =
    +      ds.reduce(fun)
    +
    +    /**
    +      * Applies a reducer `fun` to a grouped data set
    +      *
    +      * @param fun The function to be applied to the whole grouping
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of Rs
    +      */
    +    def reduceGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +      ds.reduceGroup {
    +        (it, out) =>
    +          out.collect(fun(it.to[Seq]))
    +      }
    +
    +    /**
    +      * Groups the items according to a grouping function `fun`
    +      *
    +      * @param fun The grouping function
    +      * @tparam K The return type of the grouping function, for which type information must be known
    +      * @return A grouped data set of Ts
    +      */
    +    def groupingBy[K: TypeInformation: ClassTag](fun: T => K): GroupedDataSet[T] =
    +      ds.groupBy(fun)
    +
    +  }
    +
    +  implicit class OnJoinDataSet[L: TypeInformation, R: TypeInformation](
    +      dataset: JoinDataSet[L, R]) {
    +
    +    /**
    +      * Joins the data sets using the function `fun` to project elements from both in the
    +      * resulting data set
    +      *
    +      * @param fun The function that defines the projection of the join
    +      * @tparam O The return type of the projection, for which type information must be known
    +      * @return A fully joined data set of Os
    +      */
    +    def projecting[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] =
    +      dataset(fun)
    +
    +  }
    +
    +  implicit class OnCoGroupDataSet[L: TypeInformation, R: TypeInformation](
    +      dataset: CoGroupDataSet[L, R]) {
    +
    +    /**
    +      * Co-groups the data sets using the function `fun` to project elements from both in
    +      * the resulting data set
    +      *
    +      * @param fun The function that defines the projection of the co-group operation
    +      * @tparam O The return type of the projection, for which type information must be known
    +      * @return A fully co-grouped data set of Os
    +      */
    +    def projecting[O: TypeInformation: ClassTag](fun: (Seq[L], Seq[R]) => O): DataSet[O] =
    +      dataset {
    +        (left, right) =>
    +          fun(left.to[Seq], right.to[Seq])
    +      }
    +
    +  }
    --- End diff --
    
    `GroupedDataSet` is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-204334976
  
    Note on the tests of the streaming extensions: I couldn't find a more specific class than `SingleOutputStreamOperator[_]`, thus assertion may not be very meaningful. However, I'd leave them in place to make sure further work on the extension won't break them at compile time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1704


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53948137
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataSet with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = ExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    +  *         case (id, value) => id
    +  *       }
    +  *     }
    +  *   }
    +  * }}}
    +  *
    +  */
    +package object acceptPartialFunctions {
    +
    +  implicit class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +    /**
    +      * Applies a function `fun` to each item of the data set
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +      ds.map(fun)
    +
    +    /**
    +      * Applies a function `fun` to a partition as a whole
    +      *
    +      * @param fun The function to be applied on the whole partition
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +      ds.mapPartition {
    +        (it, out) =>
    +          out.collect(fun(it.to[Seq]))
    +      }
    +
    +    /**
    +      * Applies a function `fun` to each item of the dataset, producing a collection of items
    +      * that will be flattened in the resulting data set
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R] =
    +      ds.flatMap(fun)
    +
    +    /**
    +      * Applies a predicate `fun` to each item of the data set, keeping only those for which
    +      * the predicate holds
    +      *
    +      * @param fun The predicate to be tested on each item
    +      * @return A dataset of R
    +      */
    +    def filterWith(fun: T => Boolean): DataSet[T] =
    +      ds.filter(fun)
    +
    +    /**
    +      * Applies a reducer `fun` to the data set
    +      *
    +      * @param fun The reducing function to be applied on the whole data set
    +      * @tparam R The type of the items in the returned collection
    +      * @return A data set of Rs
    +      */
    +    def reduceWith[R: TypeInformation: ClassTag](fun: (T, T) => T): DataSet[T] =
    +      ds.reduce(fun)
    +
    +    /**
    +      * Applies a reducer `fun` to a grouped data set
    +      *
    +      * @param fun The function to be applied to the whole grouping
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of Rs
    +      */
    +    def reduceGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +      ds.reduceGroup {
    +        (it, out) =>
    +          out.collect(fun(it.to[Seq]))
    +      }
    +
    +    /**
    +      * Groups the items according to a grouping function `fun`
    +      *
    +      * @param fun The grouping function
    +      * @tparam K The return type of the grouping function, for which type information must be known
    +      * @return A grouped data set of Ts
    +      */
    +    def groupingBy[K: TypeInformation: ClassTag](fun: T => K): GroupedDataSet[T] =
    +      ds.groupBy(fun)
    +
    +  }
    +
    +  implicit class OnJoinDataSet[L: TypeInformation, R: TypeInformation](
    +      dataset: JoinDataSet[L, R]) {
    +
    +    /**
    +      * Joins the data sets using the function `fun` to project elements from both in the
    +      * resulting data set
    +      *
    +      * @param fun The function that defines the projection of the join
    +      * @tparam O The return type of the projection, for which type information must be known
    +      * @return A fully joined data set of Os
    +      */
    +    def projecting[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] =
    +      dataset(fun)
    +
    +  }
    +
    +  implicit class OnCoGroupDataSet[L: TypeInformation, R: TypeInformation](
    +      dataset: CoGroupDataSet[L, R]) {
    +
    +    /**
    +      * Co-groups the data sets using the function `fun` to project elements from both in
    +      * the resulting data set
    +      *
    +      * @param fun The function that defines the projection of the co-group operation
    +      * @tparam O The return type of the projection, for which type information must be known
    +      * @return A fully co-grouped data set of Os
    +      */
    +    def projecting[O: TypeInformation: ClassTag](fun: (Seq[L], Seq[R]) => O): DataSet[O] =
    +      dataset {
    +        (left, right) =>
    +          fun(left.to[Seq], right.to[Seq])
    +      }
    +
    +  }
    --- End diff --
    
    `CrossDataSet` is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188320230
  
    How do you want to do the implicit conversion if you return a `DataSet[T]`. If it makes the code more readable, then I think it's a good idea :-)
    
    We should have a test, which makes sure that the implicit conversion works when you import the corresponding package. That should be enough.
    
    I think we could add the documentation to the streaming guide. We could add a new page with the methods and link it from the scala tab of [DataStream Transformations](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#datastream-transformations) in the streaming guide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188790808
  
    @StephanEwen I had to restore some of the context bounds on `ClassTag` to make it compile, apparently the delegated methods use them; I've rebased with the latest changes on the master before putting them back in place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55999131
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions._
    +
    +import scala.reflect.ClassTag
    +
    +package object extensions {
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnDataSet[T: TypeInformation](ds: DataSet[T]): OnDataSet[T] =
    --- End diff --
    
    Ah ok, I didn't know that Scala would have problems with implicits and method overloading. If it is not possible or it gets too complicated, then simply leave it as it is. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53948314
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataSet with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = ExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    +  *         case (id, value) => id
    +  *       }
    +  *     }
    +  *   }
    +  * }}}
    +  *
    +  */
    +package object acceptPartialFunctions {
    +
    +  implicit class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +    /**
    +      * Applies a function `fun` to each item of the data set
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +      ds.map(fun)
    +
    +    /**
    +      * Applies a function `fun` to a partition as a whole
    +      *
    +      * @param fun The function to be applied on the whole partition
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +      ds.mapPartition {
    +        (it, out) =>
    +          out.collect(fun(it.to[Seq]))
    +      }
    +
    +    /**
    +      * Applies a function `fun` to each item of the dataset, producing a collection of items
    +      * that will be flattened in the resulting data set
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R] =
    +      ds.flatMap(fun)
    +
    +    /**
    +      * Applies a predicate `fun` to each item of the data set, keeping only those for which
    +      * the predicate holds
    +      *
    +      * @param fun The predicate to be tested on each item
    +      * @return A dataset of R
    +      */
    +    def filterWith(fun: T => Boolean): DataSet[T] =
    +      ds.filter(fun)
    +
    +    /**
    +      * Applies a reducer `fun` to the data set
    +      *
    +      * @param fun The reducing function to be applied on the whole data set
    +      * @tparam R The type of the items in the returned collection
    +      * @return A data set of Rs
    +      */
    +    def reduceWith[R: TypeInformation: ClassTag](fun: (T, T) => T): DataSet[T] =
    +      ds.reduce(fun)
    +
    +    /**
    +      * Applies a reducer `fun` to a grouped data set
    +      *
    +      * @param fun The function to be applied to the whole grouping
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of Rs
    +      */
    +    def reduceGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +      ds.reduceGroup {
    +        (it, out) =>
    +          out.collect(fun(it.to[Seq]))
    +      }
    +
    +    /**
    +      * Groups the items according to a grouping function `fun`
    +      *
    +      * @param fun The grouping function
    +      * @tparam K The return type of the grouping function, for which type information must be known
    +      * @return A grouped data set of Ts
    +      */
    +    def groupingBy[K: TypeInformation: ClassTag](fun: T => K): GroupedDataSet[T] =
    +      ds.groupBy(fun)
    +
    --- End diff --
    
    `combineGroupWith` is not supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55993580
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/OnDataSet.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{GroupedDataSet, DataSet}
    +
    +import scala.reflect.ClassTag
    +
    +class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +  /**
    +    * Applies a function `fun` to each item of the data set
    +    *
    +    * @param fun The function to be applied to each item
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +    ds.map(fun)
    +
    +  /**
    +    * Applies a function `fun` to a partition as a whole
    +    *
    +    * @param fun The function to be applied on the whole partition
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.mapPartition {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    --- End diff --
    
    Does `it.to[Seq]` materializes the `iterator`? If so, then this is not so good because you can run out of memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-189232475
  
    @stefanobaghino Ah, yes, for `DataSet` you need the `ClassTag`, for `DataStream` they should not be needed...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-192522625
  
    I've squashed the commits, I don't know what I was thinking about. :disappointed: 
    This should more or less be it, I'd just like to add more tests to cover all the operators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-199840360
  
    Your 2. proposal looks more natural to me because what you receive as input to a user function is in fact a stream of data. So if this solution is feasible, then I would vote for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188289888
  
    It would also be great to add some tests to make sure that the import is working. Furthermore, it would be great to add documentation for the extension feature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53950863
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{JoinedStreams, CoGroupedStreams, KeyedStream, DataStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataStream with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    --- End diff --
    
    Copy/paste error, good catch! I'll fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-195290267
  
    @tillrohrmann The tests failure seem to be flaky, I've re-run them all on our fork and they're all green now (after a couple of retries).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55993999
  
    --- Diff: docs/apis/scala_api_extensions.md ---
    @@ -0,0 +1,392 @@
    +---
    +title: "Scala API Extensions"
    +# Top-level navigation
    +top-nav-group: apis
    +top-nav-pos: 11
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +In order to keep a fair amount of consistency between the Scala and Java APIs, some 
    +of the features that allow a high-level of expressiveness in Scala have been left
    +out from the standard APIs for both batch and streaming.
    +
    +If you want to _enjoy the full Scala experience_ you can choose to opt-in to 
    +extensions that enhance the Scala API via implicit conversions.
    +
    +To use all the available extensions, you can just add a simple `import` for the
    +DataSet API
    +
    +{% highlight scala %}
    +import org.apache.flink.api.scala.extensions._
    +{% endhighlight %}
    +
    +or the DataStream API
    +
    +{% highlight scala %}
    +import org.apache.flink.streaming.api.scala.extensions._
    +{% endhighlight %}
    +
    +Alternatively, you can import individual extensions _a-là-carte_ to only use those
    +you prefer.
    +
    +## Accept partial functions
    +
    +Normally, both the DataSet and DataStream APIs don't accept anonymous pattern
    +matching functions to deconstruct tuples, case classes or collections, like the
    +following:
    +
    +{% highlight scala %}
    +val data: DataSet[(Int, String, Double)] = // [...]
    +data.map {
    +  case (id, name, temperature) => // [...]
    +  // The previous line causes the following compilation error:
    +  // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
    +}
    +{% endhighlight %}
    +
    +This extension introduces new methods in both the DataSet and DataStream Scala API
    +that have a one-to-one correspondance in the extended API. These delegating methods 
    +do support anonymous pattern matching functions.
    +
    +#### DataSet API
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Method</th>
    +      <th class="text-left" style="width: 20%">Original</th>
    +      <th class="text-center">Example</th>
    +    </tr>
    +  </thead>
    +
    +  <tbody>
    +    <tr>
    +      <td><strong>mapWith</strong></td>
    +      <td><strong>map (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapWith {
    +  case (_, value) => value.toString
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>mapPartitionWith</strong></td>
    +      <td><strong>mapPartition (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapPartitionWith {
    +  case head +: _ => head
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>flatMapWith</strong></td>
    +      <td><strong>flatMap (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.flatMapWith {
    +  case (_, name, visitTimes) => visitTimes.map(name -> _)
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>filterWith</strong></td>
    +      <td><strong>filter (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.filterWith {
    +  case Train(_, isOnTime) => isOnTime
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>reduceWith</strong></td>
    +      <td><strong>reduce (DataSet, GroupedDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.reduceWith {
    +  case ((_, amount1), (_, amount2)) => amount1 + amount2
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>reduceGroupWith</strong></td>
    +      <td><strong>reduceGroup (GroupedDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.reduceGroupWith {
    +  case id +: value +: _ => id -> value
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>groupingBy</strong></td>
    +      <td><strong>groupBy (DataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.groupingBy {
    +  case (id, _, _) => id
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>sortGroupWith</strong></td>
    +      <td><strong>sortGroup (GroupedDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +grouped.sortGroupWith(Order.ASCENDING) {
    +  case House(_, value) => value
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>combineGroupWith</strong></td>
    +      <td><strong>combineGroup (GroupedDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +grouped.combineGroupWith {
    +  case header +: amounts => amounts.sum
    +}
    +{% endhighlight %}
    +      </td>
    +    <tr>
    +      <td><strong>projecting</strong></td>
    +      <td><strong>apply (JoinDataSet, CrossDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data1.join(data2).where(0).equalTo(1).projecting {
    +  case ((pk, tx), (products, fk)) => tx -> products
    +}
    +
    +data1.cross(data2).projecting {
    +  case ((a, _), (_, b) => a -> b
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>projecting</strong></td>
    +      <td><strong>apply (CoGroupDataSet)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data1.coGroup(data2).where(0).equalTo(1).projecting {
    +  case (head1 +: _, head2 +: _) => head1 -> head2
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    </tr>
    +  </tbody>
    +</table>
    +
    +#### DataStream API
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Method</th>
    +      <th class="text-left" style="width: 20%">Original</th>
    +      <th class="text-center">Example</th>
    +    </tr>
    +  </thead>
    +
    +  <tbody>
    +    <tr>
    +      <td><strong>mapWith</strong></td>
    +      <td><strong>map (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapWith {
    +  case (_, value) => value.toString
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>mapPartitionWith</strong></td>
    +      <td><strong>mapPartition (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapPartitionWith {
    +  case head +: _ => head
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>flatMapWith</strong></td>
    +      <td><strong>flatMap (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.flatMapWith {
    +  case (_, name, visits) => visits.map(name -> _)
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>filterWith</strong></td>
    +      <td><strong>filter (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.filterWith {
    +  case Train(_, isOnTime) => isOnTime
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>keyingBy</strong></td>
    +      <td><strong>keyBy (DataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.keyingBy {
    +  case (id, _, _) => id
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>mapWith</strong></td>
    +      <td><strong>map (ConnectedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.mapWith(
    +  map1 = case (_, value) => value.toString,
    +  map2 = case (_, _, value, _) => value + 1
    +)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>flatMapWith</strong></td>
    +      <td><strong>flatMap (ConnectedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.flatMapWith(
    +  flatMap1 = case (_, json) => parse(json),
    +  flatMap2 = case (_, _, json, _) => parse(json)
    +)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>keyingBy</strong></td>
    +      <td><strong>keyBy (ConnectedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.keyingBy(
    +  key1 = case (_, timestamp) => timestamp,
    +  key2 = case (id, _, _) => id
    +)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>reduceWith</strong></td>
    +      <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.reduceWith {
    +  case ((_, sum1), (_, sum2) => sum1 + sum2
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>foldWith</strong></td>
    +      <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.foldWith(User(bought = 0)) {
    +  case (User(b), (_, items)) => User(b + items.size)
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>applyWith</strong></td>
    +      <td><strong>apply (WindowedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data.applyWith(0)(
    +  foldFunction = case (sum, amount) => sum + amount
    +  windowFunction = case (k, w, sum) => // [...]
    +)
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><strong>projecting</strong></td>
    +      <td><strong>apply (JoinedDataStream)</strong></td>
    +      <td>
    +{% highlight scala %}
    +data1.join(data2).where(0).equalTo(1).projecting {
    +  case ((pk, tx), (products, fk)) => tx -> products
    +}
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +  </tbody>
    +</table>
    +
    +
    +
    +For more information on the semantics of each method, please refer to the 
    +[DataStream](batch/index.html) and [DataSet](streaming/index.html) API documentation.
    +
    +To use this extension exclusively, you can add the following `import`:
    +
    +{% highlight scala %}
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions
    --- End diff --
    
    Does this really work? Don't you have to import `o.a.f.api.scala.extensions.acceptPartialFunctionsOnDataSet` etc.?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55994195
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions._
    +
    +import scala.reflect.ClassTag
    +
    +package object extensions {
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnDataSet[T: TypeInformation](ds: DataSet[T]): OnDataSet[T] =
    --- End diff --
    
    I'm wondering whether we shouldn't overload the method `acceptPartialFunctions` with the different data set types instead of having a different method name for the different data sets (grouped, normal, etc.). I don't think that the user will want to enable partial function support on such a fine grained scale. Either he wants partial function support or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55996335
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/OnDataSet.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{GroupedDataSet, DataSet}
    +
    +import scala.reflect.ClassTag
    +
    +class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +  /**
    +    * Applies a function `fun` to each item of the data set
    +    *
    +    * @param fun The function to be applied to each item
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +    ds.map(fun)
    +
    +  /**
    +    * Applies a function `fun` to a partition as a whole
    +    *
    +    * @param fun The function to be applied on the whole partition
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.mapPartition {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    --- End diff --
    
    [Yes, it does.](http://www.scala-lang.org/api/current/index.html#scala.collection.Traversable@to[Col[_]]:Col[A]) I thought in this context I was fairly safe from OOMs but I'll refactor to make it work on each item of the collection individually. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53947242
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{JoinedStreams, CoGroupedStreams, KeyedStream, DataStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataStream with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    +  *         case (id, value) => id
    +  *       }
    +  *     }
    +  *   }
    +  * }}}
    +  *
    +  */
    +package object acceptPartialFunctions {
    --- End diff --
    
    I think the `ConnectedStreams` and `WindowedStreams` are missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55995217
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/OnWindowedStream.scala ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +class OnWindowedStream[T, K, W <: Window](ds: WindowedStream[T, K, W]) {
    +
    +  /**
    +    * Applies a reduce function to the window. The window function is called for each evaluation
    +    * of the window for each key individually. The output of the reduce function is interpreted
    +    * as a regular non-windowed stream.
    +    *
    +    * This window will try and pre-aggregate data as much as the window policies permit.
    +    * For example,tumbling time windows can perfectly pre-aggregate the data, meaning that only one
    +    * element per key is stored. Sliding time windows will pre-aggregate on the granularity of the
    +    * slide interval, so a few elements are stored per key (one per slide interval).
    +    * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
    +    * aggregation tree.
    +    *
    +    * @param function The reduce function.
    +    * @return The data stream that is the result of applying the reduce function to the window.
    +    */
    +  def reduceWith(function: (T, T) => T) =
    +    ds.reduce(function)
    +
    +  /**
    +    * Applies the given fold function to each window. The window function is called for each
    +    * evaluation of the window for each key individually. The output of the reduce function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * @param function The fold function.
    +    * @return The data stream that is the result of applying the fold function to the window.
    +    */
    +  def foldWith[R: TypeInformation](initialValue: R)(function: (R, T) => R) =
    +    ds.fold(initialValue)(function)
    +
    +  /**
    +    * Applies the given window function to each window. The window function is called for each
    +    * evaluation of the window for each key individually. The output of the window function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * Arriving data is incrementally aggregated using the given fold function.
    +    *
    +    * @param initialValue The initial value of the fold
    +    * @param foldFunction The fold function that is used for incremental aggregation
    +    * @param windowFunction The window function.
    +    * @return The data stream that is the result of applying the window function to the window.
    +    */
    +  def applyWith[R: TypeInformation](initialValue: R)
    +                                   (foldFunction: (R, T) => R,
    +                                    windowFunction: (K, W, R) => TraversableOnce[R]):
    --- End diff --
    
    Why does the `windowFunction` work on a single `R` element and not on all elements of a window?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55998933
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/OnDataSet.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{GroupedDataSet, DataSet}
    +
    +import scala.reflect.ClassTag
    +
    +class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +  /**
    +    * Applies a function `fun` to each item of the data set
    +    *
    +    * @param fun The function to be applied to each item
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +    ds.map(fun)
    +
    +  /**
    +    * Applies a function `fun` to a partition as a whole
    +    *
    +    * @param fun The function to be applied on the whole partition
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.mapPartition {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    --- End diff --
    
    @stefanobaghino, the idea of `mapPartition` is to process all elements but also to have the possibility to access them all from within one mapPartition call. Therefore, it should get an `Iterator[T]` as input instead of iterating over the iterator and calling the mapPartition for each element.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55993657
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/OnGroupedDataSet.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
    +
    +import scala.reflect.ClassTag
    +
    +class OnGroupedDataSet[T: ClassTag](ds: GroupedDataSet[T]) {
    +
    +  /**
    +    * Sorts a group using a sorting function `fun` and an `Order`
    +    *
    +    * @param fun The sorting function, defining the sorting key
    +    * @param order The ordering strategy (ascending, descending, etc.)
    +    * @tparam K The key type
    +    * @return A data set sorted group-wise
    +    */
    +  def sortGroupWith[K: TypeInformation](order: Order)(fun: T => K): GroupedDataSet[T] =
    +    ds.sortGroup(fun, order)
    +
    +  /**
    +    * Reduces the whole data set with a reducer `fun`
    +    *
    +    * @param fun The reducing function
    +    * @return A reduced data set of Ts
    +    */
    +  def reduceWith(fun: (T, T) => T): DataSet[T] =
    +    ds.reduce(fun)
    +
    +  /**
    +    * Reduces the data set group-wise with a reducer `fun`
    +    *
    +    * @param fun The reducing function
    +    * @tparam R The type of the items in the resulting data set
    +    * @return A data set of Rs reduced group-wise
    +    */
    +  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.reduceGroup {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    --- End diff --
    
    Materialization?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53946101
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{JoinedStreams, CoGroupedStreams, KeyedStream, DataStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataStream with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    --- End diff --
    
    Shouldn't this be called `keyingBy`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53946205
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{JoinedStreams, CoGroupedStreams, KeyedStream, DataStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataStream with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    --- End diff --
    
    import shouldn't work if I'm not mistaken


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188325067
  
    Just the streaming guide? Not on both the streaming and batch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55993625
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/OnDataSet.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{GroupedDataSet, DataSet}
    +
    +import scala.reflect.ClassTag
    +
    +class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +  /**
    +    * Applies a function `fun` to each item of the data set
    +    *
    +    * @param fun The function to be applied to each item
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +    ds.map(fun)
    +
    +  /**
    +    * Applies a function `fun` to a partition as a whole
    +    *
    +    * @param fun The function to be applied on the whole partition
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.mapPartition {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    +    }
    +
    +  /**
    +    * Applies a function `fun` to each item of the dataset, producing a collection of items
    +    * that will be flattened in the resulting data set
    +    *
    +    * @param fun The function to be applied to each item
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R] =
    +    ds.flatMap(fun)
    +
    +  /**
    +    * Applies a predicate `fun` to each item of the data set, keeping only those for which
    +    * the predicate holds
    +    *
    +    * @param fun The predicate to be tested on each item
    +    * @return A dataset of R
    +    */
    +  def filterWith(fun: T => Boolean): DataSet[T] =
    +    ds.filter(fun)
    +
    +  /**
    +    * Applies a reducer `fun` to the data set
    +    *
    +    * @param fun The reducing function to be applied on the whole data set
    +    * @tparam R The type of the items in the returned collection
    +    * @return A data set of Rs
    +    */
    +  def reduceWith[R: TypeInformation](fun: (T, T) => T): DataSet[T] =
    +    ds.reduce(fun)
    +
    +  /**
    +    * Applies a reducer `fun` to a grouped data set
    +    *
    +    * @param fun The function to be applied to the whole grouping
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of Rs
    +    */
    +  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.reduceGroup {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    --- End diff --
    
    Same question here with the materialization of the iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53950747
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataSet with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    --- End diff --
    
    Yup, leftover from the previous implementation, thanks for pointing it out, I'll fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55994602
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/OnWindowedStream.scala ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +class OnWindowedStream[T, K, W <: Window](ds: WindowedStream[T, K, W]) {
    +
    +  /**
    +    * Applies a reduce function to the window. The window function is called for each evaluation
    +    * of the window for each key individually. The output of the reduce function is interpreted
    +    * as a regular non-windowed stream.
    +    *
    +    * This window will try and pre-aggregate data as much as the window policies permit.
    +    * For example,tumbling time windows can perfectly pre-aggregate the data, meaning that only one
    +    * element per key is stored. Sliding time windows will pre-aggregate on the granularity of the
    +    * slide interval, so a few elements are stored per key (one per slide interval).
    +    * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
    +    * aggregation tree.
    +    *
    +    * @param function The reduce function.
    +    * @return The data stream that is the result of applying the reduce function to the window.
    +    */
    +  def reduceWith(function: (T, T) => T) =
    +    ds.reduce(function)
    +
    +  /**
    +    * Applies the given fold function to each window. The window function is called for each
    +    * evaluation of the window for each key individually. The output of the reduce function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * @param function The fold function.
    +    * @return The data stream that is the result of applying the fold function to the window.
    +    */
    +  def foldWith[R: TypeInformation](initialValue: R)(function: (R, T) => R) =
    +    ds.fold(initialValue)(function)
    +
    +  /**
    +    * Applies the given window function to each window. The window function is called for each
    +    * evaluation of the window for each key individually. The output of the window function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * Arriving data is incrementally aggregated using the given fold function.
    +    *
    +    * @param initialValue The initial value of the fold
    +    * @param foldFunction The fold function that is used for incremental aggregation
    +    * @param windowFunction The window function.
    +    * @return The data stream that is the result of applying the window function to the window.
    +    */
    +  def applyWith[R: TypeInformation](initialValue: R)
    +                                   (foldFunction: (R, T) => R,
    +                                    windowFunction: (K, W, R) => TraversableOnce[R]):
    +      DataStream[R] =
    --- End diff --
    
    Formatting seems to be a bit off here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55997348
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/OnDataSet.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{GroupedDataSet, DataSet}
    +
    +import scala.reflect.ClassTag
    +
    +class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +  /**
    +    * Applies a function `fun` to each item of the data set
    +    *
    +    * @param fun The function to be applied to each item
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +    ds.map(fun)
    +
    +  /**
    +    * Applies a function `fun` to a partition as a whole
    +    *
    +    * @param fun The function to be applied on the whole partition
    +    * @tparam R The type of the items in the returned data set
    +    * @return A dataset of R
    +    */
    +  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): DataSet[R] =
    +    ds.mapPartition {
    +      (it, out) =>
    +        out.collect(fun(it.to[Seq]))
    --- End diff --
    
    I think that `it.to[Seq]` can be problematic, since it can happen that an underlying `Vector` is returned here. This means that the whole iterator will be materialized. It is better imo to define `fun: Iterator[T] => R`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r56001580
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions._
    +
    +import scala.reflect.ClassTag
    +
    +package object extensions {
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnDataSet[T: TypeInformation](ds: DataSet[T]): OnDataSet[T] =
    --- End diff --
    
    Can you remember what the problem was? I quickly changed the method names and on a first glance the tests still seem to pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-196321443
  
    @tillrohrmann I just have one minor concern regarding the tests: would `isInstanceOf[StreamMap[Int, Int]]` work? Wouldn't the generic type parameters (the two `Int`s) be erased at runtime?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55995517
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions._
    +
    +import scala.reflect.ClassTag
    +
    +package object extensions {
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnDataSet[T: TypeInformation](ds: DataSet[T]): OnDataSet[T] =
    --- End diff --
    
    I've tried but the implicit resolution doesn't seem to be smart enough to recognize which overloaded implicit definition to pick, so I had to fall back to this. I can make a couple of attempts using a common object with several methods or perhaps using type classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55994370
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions._
    +
    +import scala.reflect.ClassTag
    +
    +package object extensions {
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnDataSet[T: TypeInformation](ds: DataSet[T]): OnDataSet[T] =
    +    new OnDataSet[T](ds)
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnJoinDataSet[L: TypeInformation, R: TypeInformation](
    +      ds: JoinDataSet[L, R]): OnJoinDataSet[L, R] =
    +    new OnJoinDataSet[L, R](ds)
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    --- End diff --
    
    Same here with the JavaDocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-205234947
  
    @StephanEwen Thanks for the feedback, I've addressed the points you highlighted and took the time to add some missing Scaladoc to the `DataStream` extension.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188352497
  
    @tillrohrmann I got mixed up reading your proposal and just got what you meant with the `extensions.acceptPartialFunctions` implicit conversion, thanks for the tip, I'll add it to the PR as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188329671
  
    Of course, you're right. Also in the batch guide :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-205271024
  
    Looks good to me. Will merge this finally!
    
    Thanks for the hard work and the many iterations here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-195491606
  
    This pull request adds a lot of tests (which is actually good), but all tests fire up a cluster to execute many programs. But need to somehow get this down, as these "fire up mini cluster" tests have made our build times explode.
    
    Is actual program execution needed here? Or is it sufficient to see that the partial function for a certain transformation creates such a transformation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r56000181
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/OnWindowedStream.scala ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +class OnWindowedStream[T, K, W <: Window](ds: WindowedStream[T, K, W]) {
    +
    +  /**
    +    * Applies a reduce function to the window. The window function is called for each evaluation
    +    * of the window for each key individually. The output of the reduce function is interpreted
    +    * as a regular non-windowed stream.
    +    *
    +    * This window will try and pre-aggregate data as much as the window policies permit.
    +    * For example,tumbling time windows can perfectly pre-aggregate the data, meaning that only one
    +    * element per key is stored. Sliding time windows will pre-aggregate on the granularity of the
    +    * slide interval, so a few elements are stored per key (one per slide interval).
    +    * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
    +    * aggregation tree.
    +    *
    +    * @param function The reduce function.
    +    * @return The data stream that is the result of applying the reduce function to the window.
    +    */
    +  def reduceWith(function: (T, T) => T) =
    +    ds.reduce(function)
    +
    +  /**
    +    * Applies the given fold function to each window. The window function is called for each
    +    * evaluation of the window for each key individually. The output of the reduce function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * @param function The fold function.
    +    * @return The data stream that is the result of applying the fold function to the window.
    +    */
    +  def foldWith[R: TypeInformation](initialValue: R)(function: (R, T) => R) =
    +    ds.fold(initialValue)(function)
    +
    +  /**
    +    * Applies the given window function to each window. The window function is called for each
    +    * evaluation of the window for each key individually. The output of the window function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * Arriving data is incrementally aggregated using the given fold function.
    +    *
    +    * @param initialValue The initial value of the fold
    +    * @param foldFunction The fold function that is used for incremental aggregation
    +    * @param windowFunction The window function.
    +    * @return The data stream that is the result of applying the window function to the window.
    +    */
    +  def applyWith[R: TypeInformation](initialValue: R)
    +                                   (foldFunction: (R, T) => R,
    +                                    windowFunction: (K, W, R) => TraversableOnce[R]):
    --- End diff --
    
    The implementation iterates over each `R` item. However, this approach seems to contrast with [the remarks you have on `it.to[Seq]`](https://github.com/apache/flink/pull/1704#discussion_r55998933); I will restore the `Iterator` here as well. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53947144
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/package.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{JoinedStreams, CoGroupedStreams, KeyedStream, DataStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataStream with methods with unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +  * the fact that overloaded methods taking functions as parameters can't accept partial
    +  * functions as well. This enables the possibility to directly apply pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = StreamExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    +  *         case (id, value) => id
    +  *       }
    +  *     }
    +  *   }
    +  * }}}
    +  *
    +  */
    +package object acceptPartialFunctions {
    +
    +  implicit class OnDataStream[T: TypeInformation](stream: DataStream[T]) {
    +
    +    /**
    +      * Applies a function `fun` to each item of the stream
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned stream
    +      * @return A dataset of R
    +      */
    +    def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] =
    +      stream.map(fun)
    +
    +    /**
    +      * Applies a function `fun` to each item of the stream, producing a collection of items
    +      * that will be flattened in the resulting stream
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned stream
    +      * @return A dataset of R
    +      */
    +    def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] =
    +      stream.flatMap(fun)
    +
    +    /**
    +      * Applies a predicate `fun` to each item of the stream, keeping only those for which
    +      * the predicate holds
    +      *
    +      * @param fun The predicate to be tested on each item
    +      * @return A dataset of R
    +      */
    +    def filterWith(fun: T => Boolean): DataStream[T] =
    +      stream.filter(fun)
    +
    +    /**
    +      * Keys the items according to a keying function `fun`
    +      *
    +      * @param fun The keying function
    +      * @tparam K The type of the key, for which type information must be known
    +      * @return A stream of Ts keyed by Ks
    +      */
    +    def keyingBy[K: TypeInformation: ClassTag](fun: T => K): KeyedStream[T, K] =
    +      stream.keyBy(fun)
    +
    +  }
    +
    +  implicit class OnKeyedStream[T: TypeInformation, K](stream: KeyedStream[T, K]) {
    --- End diff --
    
    What is with the `fold` operation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188409068
  
    This should cover the missing implementations. I also included the tests I used to test the functionality, let me know if you prefer a wider coverage. I'll provide the docs ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-205288351
  
    It's been a pleasure, thanks for the patience and for guiding me through the iterations. :smiley: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-196320836
  
    @tillrohrmann Thanks! Outstanding review, it's great to have some guidance when approaching a new project; thank you for the tips on testing as well, it turned out to be way more simple then I thought. :) I'll make sure to apply the changes you mentioned, review the documentation and rewrite and expand the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-196325179
  
    Yes I think they will be removed at runtime. Thus, I guess it should also be fine to test for `isInstanceOf[StreamMap[_, _]]`. If you also want to check the input/output types properly, then you can use the `TypeInformations` which are accessible from the `OneInputTransformation`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55994329
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions._
    +
    +import scala.reflect.ClassTag
    +
    +package object extensions {
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnDataSet[T: TypeInformation](ds: DataSet[T]): OnDataSet[T] =
    +    new OnDataSet[T](ds)
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    --- End diff --
    
    JavaDocs does not fit to method since it only works on `JoinDataSet`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-199837531
  
    A quick status update: unfortunately I have had no time to work on this in the past week, I plan to get back to the fixes on Thursday or Friday.
    In the meanwhile, I have a small doubt on the usage of `Iterator`: they are indeed very good to allow the user to have access both to the whole `DataSet`/`DataStream` or accessing it one item at a time; however, they are not particularly useful when used with the case-style partial functions: they offer an edge to de-structure a single item like a tuple or a collection like `Seq` (e.g. using the `_ +: rest` operator to only get the item after the first).
    Are we sure we want to keep the `Iterator`? Is there an advantage in having an `Iterator` with this extension? I see to possible solutions: 
    
    1. the easy one: having two methods, one materializing the `Iterator` into a collection and another one accessing the items one a time: the only issue with this would be the need to have to methods with distinct names (otherwise we would be back to square one); this means the user can use the case-style functions to destructure the collection or each item separately; otherwise we can
    2. adopt a slightly more sophisticated solution: wrap the `Iterator` in a `Stream`, which is lazy but also fully destructurable in case-style functions (e.g.: using the `#::` operator). This would require some work as the `Iterator` is stateful with regards of the traversal while the `Stream` is not and we can't just use a naive solution or the semantic difference could lead to some nasty bugs in user code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by stefanobaghino <gi...@git.apache.org>.
Github user stefanobaghino commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r57452214
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions._
    +
    +import scala.reflect.ClassTag
    +
    +package object extensions {
    +
    +  /**
    +    * acceptPartialFunctions extends the original DataSet with methods with unique names
    +    * that delegate to core higher-order functions (e.g. `map`) so that we can work around
    +    * the fact that overloaded methods taking functions as parameters can't accept partial
    +    * functions as well. This enables the possibility to directly apply pattern matching
    +    * to decompose inputs such as tuples, case classes and collections.
    +    *
    +    * e.g.
    +    * {{{
    +    *   object Main {
    +    *     import org.apache.flink.api.scala.extensions._
    +    *     case class Point(x: Double, y: Double)
    +    *     def main(args: Array[String]): Unit = {
    +    *       val env = ExecutionEnvironment.getExecutionEnvironment
    +    *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +    *       ds.filterWith {
    +    *         case Point(x, _) => x > 1
    +    *       }.reduceWith {
    +    *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    +    *       }.mapWith {
    +    *         case Point(x, y) => (x, y)
    +    *       }.flatMapWith {
    +    *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +    *       }.groupingBy {
    +    *         case (id, value) => id
    +    *       }
    +    *     }
    +    *   }
    +    * }}}
    +    *
    +    */
    +  implicit def acceptPartialFunctionsOnDataSet[T: TypeInformation](ds: DataSet[T]): OnDataSet[T] =
    --- End diff --
    
    Ok, in the end the problem was a clash between the `acceptPartialFunctions` package and implicit conversions. I've solved it by moving the actual implementations under the `impl` package under `acceptPartialFunctions`; now the whole set of conversions can be imported with the `acceptPartialFunctions` name. I was wrong regarding the issue on the resolution of overloaded implicit conversions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1159] Case style anonymous functions no...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1704#issuecomment-188289230
  
    Thanks for your contribution @stefanobaghino. I really like this feature a lot :-)
    
    Currently, the implementation is not complete, because the supported set of API calls is not complete. 
    
    With the current packaging structure one would always have to import `org.apache.flink.api.scala.extensions.acceptPartialFunctions._`. I would rather like to import the following to get partial function support `org.apache.flink.api.scala.extensions.acceptPartialFunctions` or if I want to import all extensions: `org.apache.flink.api.scala.extensions._`. We could achieve this by introducing an `extensions` package object which does something like:
    
    ```
    package object extensions {
      implicit def acceptPartialFunctions[T: TypeInformation](ds: DataStream[T]):
        DataStreamWithPartialFunctionSupport[T] = {
        new DataStreamWithPartialFunctionSupport[T](ds)
      }
    ```
    
    What do you think?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---