You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ktzoumas <gi...@git.apache.org> on 2014/07/29 18:42:27 UTC

[GitHub] incubator-flink pull request: Java api functions to sams

GitHub user ktzoumas opened a pull request:

    https://github.com/apache/incubator-flink/pull/85

    Java api functions to sams

    This is FLINK-701. This is not good to merge yet, I am putting it out for comments and assistance. However, it should be part of release 0.6 as it breaks compatibility.
    
    The patch changes the Java and Record APIs to work on top of SAM (Single Abstract Method) interfaces rather than abstract classes. The SAMs are named Fooable for an operation Foo (e.g., Mappable for Map, Reducible for Reduce), and they replace the former GenericFooer (e.g., GenericMapper) interfaces. 
    
    The original "rich functions" still exist and work as usual, as they implement hte aforementioned interfaces. They are called FooFunction (e.g., MapFunction, ReduceFunction), and contain the open(), close(), etc methods as well.
    
    As part of the refactoring, Cross was changed to return exactly one value rather than taking a collector as input.
    
    GenericCombiner is renamed to FlatCombinable (general naming rule: FlatFooable takes a Collector as parameter and does not return a value, Fooable returns exactly one value if both interfaces
    exist). This PR does *not* add a Combinable and does *not* solve FLINK-848, these can be added later without breaking compatibility.
    
    This PR does add an explicit FlatJoinable that is at the same level of inheritance as Joiable. The runtime works only on FlatJoinable objects, Joinables are shallowly transformed to FlatJoinables (see
    GeneratedFlatJoinFunction).
    
    Two consequences:
    
    (1) FlatCross is removed from the Scala API, as it cannot be supported by the new Cross signature. This is an API design choice that cannot be rectified immediately.
    
    (2) As a side-effect, this PR does add support for Java 8 lambdas in the filter and reduce operators. Lambdas in the other operators do not work yet, as the current TypeExctractor implementation needs to be adapted to extract the return types of lambdas (filter and reduce have both known return types). With that solved, FLINK-612 will probably be resolved as well.
    
    Things that need to be fixed:
    
    (1) Currently, several POM files require Java 8. This will be changed. The goal is to have some lambda tests that are executed only if Java 8 is present.
    
    (2) If a lambda function is currently provided as an argument to a method other than filter or reduce, an error is displayed to the user. Detecting that the input is a lambda is currently done via string matching on the function name (see FunctionUtils.isLambdaFunction) which (a) is quite iffy, and (b) might not work with all JVMSs/break in several occasions.
    
    If someone knows a good way to "detect if something is a lambda", that would be great! The internet was not too much help until now.
    
    (3) Could I have an extra set of eyes on RegularPactTask, line 531?
    
    (4) DeltaIterationTranslationTest currently does not pass, this is simple fix, I will do this asap.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ktzoumas/incubator-flink java_api_functions_to_sams

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/85.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #85
    
----
commit 9bdc93166a9dfc25b0e83cb86769151281663563
Author: Kostas Tzoumas <ko...@gmail.com>
Date:   2014-07-18T13:37:19Z

    Renamed of Function to RichFunction, created empty Function interface

commit d8bdd90bc35446d65d8a0bbb3a75daa23f9f5b2a
Author: Kostas Tzoumas <ko...@gmail.com>
Date:   2014-07-18T13:37:26Z

    Renamed of Function to RichFunction, created empty Function interface

commit 56b27cc46665fcbc5cd69bfeb19fe695c89d64a3
Author: Kostas Tzoumas <ko...@gmail.com>
Date:   2014-07-18T14:48:48Z

    changes to wrapping function

commit 2eb3aa8a7d301fa5f52c0311ee5cb4fd9920065d
Author: Kostas Tzoumas <ko...@gmail.com>
Date:   2014-07-19T11:22:38Z

    map and reduce operators

commit 762c33b37f78e82af76690a8b6e188c361b02f15
Author: Kostas Tzoumas <ko...@gmail.com>
Date:   2014-07-23T13:39:21Z

    group reduce refactoring

commit d041d15cc16c16ea026d7b73673b0a592ba545d7
Author: Kostas Tzoumas <ko...@gmail.com>
Date:   2014-07-24T09:34:41Z

    group reduce as interface -- shallow implementation

commit aed25492c994c200a50ff4972abb274fc0387c4e
Author: Kostas Tzoumas <ko...@gmail.com>
Date:   2014-07-29T16:34:26Z

    Java and record API based on SAM interfaces

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50639888
  
    so for mappable:
    - why not go with the previous name? ```GenericMapper```
    - or drop the "generic" part? ```Mapper```
    - or go even shorter? ```Map```
    - maybe append "SAM" ? ```MapSAM```/```MapperSAM```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50726582
  
    Thank you everyone for the terrific feedback. Lambda detection is changed to use SerializedLambda as Stephan suggested (that was grant!).
    
    In terms of naming, I do like Mapper, Reducer, FlatMapper, and CoGrouper. The problematic ones are "Filterer", "Joiner", "Crosser", and "FlatJoiner". We can live with those, or call them simply "Filter", "Join", etc, thus having an inconsistent naming scheme (we should probably not use the name "Map" as it would cause frequent conflicts with Java Maps). 
    
    A previous thought was MapFunctional, ReduceFunctional, etc, referring to functional interfaces. Any thoughts on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15570952
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---
    @@ -424,28 +448,77 @@ protected DefaultJoin(DataSet<I1> input1, DataSet<I2> input2,
     				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
     		{
     			super(input1, input2, keys1, keys2, 
    -				(JoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultJoinFunction<I1, I2>(),
    +				(FlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(),
     				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint);
     		}
     		
     		/**
    -		 * Finalizes a Join transformation by applying a {@link JoinFunction} to each pair of joined elements.<br/>
    +		 * Finalizes a Join transformation by applying a {@link org.apache.flink.api.java.functions.FlatJoinFunction} to each pair of joined elements.<br/>
     		 * Each JoinFunction call returns exactly one element. 
     		 * 
     		 * @param function The JoinFunction that is called for each pair of joined elements.
     		 * @return An EquiJoin that represents the joined result DataSet
     		 * 
    -		 * @see JoinFunction
    +		 * @see org.apache.flink.api.java.functions.FlatJoinFunction
     		 * @see org.apache.flink.api.java.operators.JoinOperator.EquiJoin
     		 * @see DataSet
     		 */
    -		public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> function) {
    +		public <R> EquiJoin<I1, I2, R> with(FlatJoinable<I1, I2, R> function) {
     			if (function == null) {
     				throw new NullPointerException("Join function must not be null.");
     			}
    +			if (FunctionUtils.isLambdaFunction(function)) {
    +				throw new UnsupportedLambdaExpressionException();
    +			}
     			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
     			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
     		}
    +
    +		public <R> EquiJoin<I1, I2, R> with (Joinable<I1, I2, R> function) {
    +			if (function == null) {
    +				throw new NullPointerException("Join function must not be null.");
    +			}
    +			if (FunctionUtils.isLambdaFunction(function)) {
    +				throw new UnsupportedLambdaExpressionException();
    +			}
    +			FlatJoinable generatedFunction = new GeneratedFlatJoinFunction<I1, I2, R> (function);
    +			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
    +			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());
    +		}
    +
    +		private static class GeneratedFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<Joinable<IN1,IN2,OUT>> implements FlatJoinable<IN1, IN2, OUT> {
    --- End diff --
    
    Minor comment: I personally find that GeneratedFunction sounds like a code generated piece of code. Since it is a static wrapper function, I would actually call it WrappingFlatJoinFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15570737
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java ---
    @@ -0,0 +1,28 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.functions;
    +
    +public class UnsupportedLambdaExpressionException extends RuntimeException {
    --- End diff --
    
    We are using the `InvalidProgramException` in most places to indicate incorrect and unspoorted behavior. I would suggest to make this UnsupportedLambdaExpressionException a subclass of the InvalidProgramException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50517219
  
    I guess that (a) is also the only variant to make it work in Eclipse, since m2e does not seem to evaluate these profiles. Eclipse users with java < 8 need to close that subproject then.
    
    Can we make a subproject of a flink-tests (and still have code in flink-tests) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15571375
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---
    @@ -525,7 +526,9 @@ protected void run() throws Exception {
     			// modify accumulators.ll;
     			if (this.stub != null) {
     				// collect the counters from the stub
    -				Map<String, Accumulator<?,?>> accumulators = this.stub.getRuntimeContext().getAllAccumulators();
    +
    +				// !!! Is this.runtimeUdfContext the right thing to return here if this.stub.getRuntimeContext() is null? !!!
    --- End diff --
    
    I agree, both should be the same. And previously, none was ever null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50585614
  
    In `DataSet` you have checks for Map, FlatMap, GroupReduce, to check that no currently unsupported Lambdas are used.
    
    Can you add the same checks for Join, Cross, CoGroup?
    
    I will start working on a java-8 only subproject.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50731057
  
    I don't find Joiner, Crosser, and FlatJoiner too bad. Filterer sounds strange though...
    FooFunctional would also be fine with me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15570974
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java ---
    @@ -0,0 +1,69 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.functions.util;
    +
    +
    +import org.apache.flink.api.common.functions.Function;
    +import org.apache.flink.api.common.functions.RichFunction;
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.configuration.Configuration;
    +
    +import java.util.regex.Pattern;
    +
    +public class FunctionUtils {
    +
    +	private static final Pattern lambdaPattern = Pattern.compile("(\\S+)\\$\\$Lambda\\$(\\d+)/\\d+");
    --- End diff --
    
    Are you sure that this is true?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50646910
  
    I like Mapper best. A "Map" is something different in most programming languages. :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50636024
  
    Maybe it's just me but I don't like the names of the interfaces. Reducible, for example, to me suggests that the thing can be reduced. But the UDF does the reducing and is not reducible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50511543
  
    I see two options regarding the Java8 tests: 
    a) We create a "flink-java8-tests" maven module that is only included into the build if java8 is present (we can do this via build profiles that activate at certain java versions).
    
    b) We integrate the java8 tests into the regular "flink-tests" module, into a separate package and do some maven includes / excludes tricks with build profiles.
    
    I vote for option a) it is cleaner and easier to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15629301
  
    --- Diff: flink-java8-tests/pom.xml ---
    @@ -0,0 +1,196 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-parent</artifactId>
    +		<version>0.6-incubating-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-java8-tests</artifactId>
    +	<name>flink-java8-tests</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-core</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-compiler</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-runtime</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-clients</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-java</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>junit</groupId>
    +			<artifactId>junit</artifactId>
    +			<version>4.7</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-scala</artifactId>
    +			<version>${project.version}</version>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-test-utils</artifactId>
    +			<version>${project.version}</version>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-java-examples</artifactId>
    +			<version>${project.version}</version>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-scala-examples</artifactId>
    +			<version>${project.version}</version>
    +			<scope>test</scope>
    +		</dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +	</dependencies>
    +
    +	<build>
    +		<plugins>
    +            <plugin>
    +                <!-- just define the Java version to be used for compiling and plugins -->
    +                <groupId>org.apache.maven.plugins</groupId>
    +                <artifactId>maven-compiler-plugin</artifactId>
    --- End diff --
    
    Thank you for catching this, this is now fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15599749
  
    --- Diff: flink-java8-tests/pom.xml ---
    @@ -0,0 +1,196 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-parent</artifactId>
    +		<version>0.6-incubating-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-java8-tests</artifactId>
    +	<name>flink-java8-tests</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-core</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-compiler</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-runtime</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-clients</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-java</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>junit</groupId>
    +			<artifactId>junit</artifactId>
    +			<version>4.7</version>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-scala</artifactId>
    +			<version>${project.version}</version>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-test-utils</artifactId>
    +			<version>${project.version}</version>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-java-examples</artifactId>
    +			<version>${project.version}</version>
    +			<scope>test</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-scala-examples</artifactId>
    +			<version>${project.version}</version>
    +			<scope>test</scope>
    +		</dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +	</dependencies>
    +
    +	<build>
    +		<plugins>
    +            <plugin>
    +                <!-- just define the Java version to be used for compiling and plugins -->
    +                <groupId>org.apache.maven.plugins</groupId>
    +                <artifactId>maven-compiler-plugin</artifactId>
    --- End diff --
    
    The `maven-compiler-plugin` is defined twice in the file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15570502
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java ---
    @@ -0,0 +1,69 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.functions.util;
    +
    +
    +import org.apache.flink.api.common.functions.Function;
    +import org.apache.flink.api.common.functions.RichFunction;
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.configuration.Configuration;
    +
    +import java.util.regex.Pattern;
    +
    +public class FunctionUtils {
    +
    +	private static final Pattern lambdaPattern = Pattern.compile("(\\S+)\\$\\$Lambda\\$(\\d+)/\\d+");
    --- End diff --
    
    We might be able to simplify the detection of the lambda currently by a check `function.getClass().getName().indexOf('/') != -1`. This uses the fact that the slach character `/` is only used on lambda function names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15559264
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---
    @@ -525,7 +526,9 @@ protected void run() throws Exception {
     			// modify accumulators.ll;
     			if (this.stub != null) {
     				// collect the counters from the stub
    -				Map<String, Accumulator<?,?>> accumulators = this.stub.getRuntimeContext().getAllAccumulators();
    +
    +				// !!! Is this.runtimeUdfContext the right thing to return here if this.stub.getRuntimeContext() is null? !!!
    --- End diff --
    
    Yes, I think so, but it might be redundant.
    
    If I am not mistaken, the control flow of `RegularPactTask` results in `this.stub.getRuntimeContext() == this.runtimeUdfContext`. So if one is null, then the other will be null as well and vice versa.
    
    In the `initialize` method, we do the following (among other things):
    1. Create the RuntimeContext (line 434)
    2. Initialize the stub/function (line 441)
    
    Stub initalization sets the runtime context of the stub in line 699 to the previously created runtime context with `FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15629298
  
    --- Diff: flink-java8-tests/pom.xml ---
    @@ -0,0 +1,196 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-parent</artifactId>
    +		<version>0.6-incubating-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-java8-tests</artifactId>
    +	<name>flink-java8-tests</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-core</artifactId>
    --- End diff --
    
    Thank you for catching this, this is now fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50553315
  
    Great news :-)
    
    I think it's very good that we plan to have the breaking changes with the next release already.
    
    I made a line comment regarding (3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50741310
  
    Another possibility would be MapUDF and friends...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15599729
  
    --- Diff: flink-java8-tests/pom.xml ---
    @@ -0,0 +1,196 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-parent</artifactId>
    +		<version>0.6-incubating-SNAPSHOT</version>
    +		<relativePath>..</relativePath>
    +	</parent>
    +
    +	<artifactId>flink-java8-tests</artifactId>
    +	<name>flink-java8-tests</name>
    +
    +	<packaging>jar</packaging>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-core</artifactId>
    --- End diff --
    
    I don't think we need all dependencies here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-701] Common API based on SAM ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-flink/pull/85


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15598168
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java ---
    @@ -0,0 +1,69 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.functions.util;
    +
    +
    +import org.apache.flink.api.common.functions.Function;
    +import org.apache.flink.api.common.functions.RichFunction;
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.configuration.Configuration;
    +
    +import java.util.regex.Pattern;
    +
    +public class FunctionUtils {
    +
    +	private static final Pattern lambdaPattern = Pattern.compile("(\\S+)\\$\\$Lambda\\$(\\d+)/\\d+");
    --- End diff --
    
    I think just checking for `/` should be sufficient (I was unable to find out how IBM's JDK is naming these classes)
    We could also it with this condition:
    `function.getClass().isSynthetic() && function.getClass().getDeclaredMethods().length == 1`. I think it is very unlikely that the user is passing a runtime generated class (=synthetic) as a UDF.
    
    http://stackoverflow.com/questions/23870478/how-to-correctly-determine-that-an-object-is-a-lambda
    http://stackoverflow.com/questions/399546/synthetic-class-in-java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50670878
  
    Simple type extraction from Lambdas (limited to non-generic parameters):
    
    ```java
    	public static Class<?>[] getLambdaParameters(Object lambda) {
    		for (Class<?> clazz = lambda.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
    			try {
    				Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
    				replaceMethod.setAccessible(true);
    				Object serializedForm = replaceMethod.invoke(lambda);
    				
    				if (serializedForm instanceof SerializedLambda) {
    					SerializedLambda sl = (SerializedLambda) serializedForm;
    					return getTypesFromSerializedLambda(sl);
    				}
    			}
    			catch (NoSuchMethodException e) {
    				// fall through the loop and try the next class
    			}
    			catch (Throwable t) {
    				throw new RuntimeException(t);
    			}
    		}
    		
    		throw new IllegalArgumentException("Not a serialized Lambda");
    	}
    	
    	public static Class<?>[] getTypesFromSerializedLambda(SerializedLambda sl) throws Exception {
    		String sig = sl.getImplMethodSignature();
    		
    		if (!sig.startsWith("(")) {
    			throw new Exception("Parse Error");
    		}
    		
    		String parameters = sig.substring(1, sig.indexOf(')'));
    		String[] params = parameters.split(";");
    		
    		List<Class<?>> classes = new ArrayList<>();
    		
    		for (String p : params) {
    			if (!p.startsWith("L")) {
    				throw new Exception("Parse Error");
    			}
    			
    			p = p.substring(1);
    			p = p.replace('/', '.');
    			classes.add(Class.forName(p));
    		}
    		
    		
    		return (Class<?>[]) classes.toArray(new Class<?>[classes.size()]);
    	}
    ```
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50667122
  
    I found a good way to detect lambdas. It only works if the SAM interface is serializable, but ours are always.
    
    The trick is to search for the `writeReplace` method from serializable objects and see whether it returns a `SerializedLambda`.
    
    ```
    public interface LambdaFunction extends java.io.Serializable {
        String doComputation(Integer value);
    }
    
    public static void main(String[] args) throws Exception {
        LambdaFunction func = (theInteger) -> "string " + String.valueOf(theInteger);
    
        for (Class<?> clazz = func.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
            try {
                Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
                replaceMethod.setAccessible(true);
                Object serializedForm = replaceMethod.invoke(func);
    				
                if (serializedForm instanceof SerializedLambda) {
                    SerializedLambda sl = (SerializedLambda) serializedForm;
                    System.out.println(sl);
                    break;
                }
            }
            catch (NoSuchMethodError e) {
                // fall through the loop and try the next class
            }
            catch (Throwable t) {
                t.printStackTrace();
                return;
            }
        }
    }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50731751
  
    I am with you. Mapper, Reducer is fine. And Join, Cross, Filter (without *-er*) would also be fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50636733
  
    I agree, the names are not perfect. Anyone has a good for names?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Common API based on SAM interfaces r...

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on the pull request:

    https://github.com/apache/incubator-flink/pull/85#issuecomment-50586842
  
    These checks are made in the "with" methods for Join, Cross, CoGroup, which is where the UDF is provided


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---