You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/26 10:07:19 UTC

[1/4] git commit: Make Java 8 Doc fit into new doc scheme

Repository: incubator-flink
Updated Branches:
  refs/heads/master 4a4489936 -> 3c25a97a1


Make Java 8 Doc fit into new doc scheme

This closes #113


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3c25a97a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3c25a97a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3c25a97a

Branch: refs/heads/master
Commit: 3c25a97a1beaac9664d82c80a0fca7cff4068e74
Parents: a0c6ac9
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 25 17:50:14 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Sep 26 10:00:46 2014 +0200

----------------------------------------------------------------------
 docs/_includes/sidenav.html     |   1 +
 docs/java8_programming_guide.md | 138 +++++++++++++++++++++++++++++++++
 docs/java_api_java8.md          | 143 -----------------------------------
 3 files changed, 139 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3c25a97a/docs/_includes/sidenav.html
----------------------------------------------------------------------
diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 40cea81..7c22633 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -23,6 +23,7 @@
     <ul>
       <li><a href="programming_guide.html">Programming Guide</a></li>
       <li><a href="dataset_transformations.html">DataSet Transformations</a></li>
+      <li><a href="java8_programming_guide.html">Java 8 Programming Guide</a></li>
       <li><a href="streaming_guide.html">Streaming Guide</a></li>
       <li><a href="iterations.html">Iterations</a></li>
       <li><a href="spargel_guide.html">Spargel Graph API</a></li>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3c25a97a/docs/java8_programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/java8_programming_guide.md b/docs/java8_programming_guide.md
new file mode 100644
index 0000000..236c882
--- /dev/null
+++ b/docs/java8_programming_guide.md
@@ -0,0 +1,138 @@
+---
+title: "Java 8 Programming Guide"
+---
+
+* This will be replaced by the TOC
+{:toc}
+
+Java 8 introduces several new language features designed for faster and clearer coding. With the most important feature, 
+the so-called "Lambda Expressions", Java 8 opens the door to functional programming. Lambda Expressions allow for implementing and 
+passing functions in a straightforward way without having to declare additional (anonymous) classes.
+
+The newest version of Flink supports the usage of Lambda Expressions for all operators of the Java API.
+This document shows how to use Lambda Expressions and describes current limitations. For a general introduction to the
+Flink API, please refer to the [Programming Guide](programming_guide.html)
+
+
+### Examples
+
+The following example illustrates how to implement a simple, inline `map()` function that squares its input using a Lambda Expression. 
+The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java 8 compiler.
+
+~~~java
+env.fromElements(1, 2, 3)
+// returns the squared i
+.map(i -> i*i)
+.print();
+~~~
+
+The next two example show different implementations of a function that uses a `Collector` for output. 
+Functions, such as `flatMap()`, require a output type (in this case `String`) to be defined for the `Collector` in order to be type-safe. 
+If the `Collector` type can not be inferred from the surrounding context, it need to be declared in the Lambda Expression's parameter list manually. 
+Otherwise the output will be treated as type `Object` which can lead to undesired behaviour.
+
+~~~java
+DataSet<String> input = env.fromElements(1, 2, 3);
+
+// collector type must be declared
+input.flatMap((Integer number, Collector<String> out) -> {
+    for(int i = 0; i < number; i++) {
+        out.collect("a");
+    }
+})
+// returns "a", "a", "aa", "a", "aa" , "aaa"
+.print();
+~~~
+
+~~~java
+DataSet<String> input = env.fromElements(1, 2, 3);
+
+// collector type must not be declared, it is inferred from the type of the dataset
+DataSet<String> manyALetters = input.flatMap((number, out) -> {	
+    for(int i = 0; i < number; i++) {
+        o.collect("a");
+    }
+});
+
+// returns "a", "a", "aa", "a", "aa" , "aaa"
+manyALetters.print();
+~~~
+
+The following code demonstrates a word count which makes extensive use of Lambda Expressions.
+
+~~~java
+DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
+		
+// filter out strings that contain "not"
+input.filter(line -> !line.contains("not"))
+// split each line by space
+.map(line -> line.split(" "))
+// emit a pair <word,1> for each array element
+.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out) 
+    -> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
+    )
+// group and sum up
+.groupBy(0).sum(1)
+// print
+.print();
+~~~
+
+### Compiler Limitations
+Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler**. 
+
+Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely. 
+Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all generic parameters related to Lambda Expressions. This means that types such as `Tuple2<String,Integer` or `Collector<String>` declared as a Lambda function input or output parameter will be pruned to `Tuple2` or `Collector` in the compiled `.class` files, which is too little information for the Flink Compiler. 
+
+How to compile a Flink job that contains Lambda Expressions with the JDT compiler will be covered in the next section. 
+
+However, it is possible to implement functions such as `map()` or `filter()` with Lambda Expressions in Java 8 compilers other than the Eclipse JDT compiler as long as the function has no `Collector`s or `Iterable`s *and* only if the function handles unparameterized types such as `Integer`, `Long`, `String`, `MyOwnClass` (types without Generics!).
+
+#### Compile Flink jobs with the Eclipse JDT compiler
+
+If you are using the Eclipse IDE, you can run and debug your Flink code within the IDE without any problems. The Eclipse IDE by default compiles its Java sources with the Eclipse JDT compiler. 
+
+If you are using a different IDE such as IntelliJ IDEA or you want to package your Jar-File with Maven to run your job on a cluster, you need to modify your project's `pom.xml` file and build your program with Maven. The [quickstart](setup_quickstart.html) contains preconfigured Maven projects which can be used for new projects or as a reference. Uncomment the mentioned lines in your generated quickstart `pom.xml` file if you want to use Java 8 with Lambda Expressions. 
+
+Alternatively, you can manually insert the following lines to your Maven `pom.xml` file. Maven will then use the Eclipse JDT compiler for compilation.
+
+~~~xml
+<!-- put these lines under "project/build/pluginManagement/plugins" of your pom.xml -->
+
+<plugin>
+    <!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
+    <artifactId>maven-compiler-plugin</artifactId>
+    <configuration>
+        <source>1.8</source>
+        <target>1.8</target>
+        <compilerId>jdt</compilerId>
+    </configuration>
+    <dependencies>
+        <!-- This dependency provides the implementation of compiler "jdt": -->
+        <dependency>
+            <groupId>org.eclipse.tycho</groupId>
+            <artifactId>tycho-compiler-jdt</artifactId>
+            <version>0.21.0</version>
+        </dependency>
+    </dependencies>
+</plugin>
+~~~
+
+If you are using Eclipse for development, the m2e plugin might complain about the inserted lines above and marks your `pom.xml` as invalid. If so, insert the following lines to your `pom.xml`.
+
+~~~xml
+<!-- put these lines under "project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions" of your pom.xml -->
+
+<pluginExecution>
+    <pluginExecutionFilter>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <versionRange>[3.1,)</versionRange>
+        <goals>
+            <goal>testCompile</goal>
+        </goals>
+    </pluginExecutionFilter>
+    <action>
+        <ignore></ignore>
+    </action>
+</pluginExecution>
+~~~

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3c25a97a/docs/java_api_java8.md
----------------------------------------------------------------------
diff --git a/docs/java_api_java8.md b/docs/java_api_java8.md
deleted file mode 100644
index 75e6973..0000000
--- a/docs/java_api_java8.md
+++ /dev/null
@@ -1,143 +0,0 @@
----
-title: "Java API with Java 8"
----
-
-<section id="top">
-Java API with Java 8
------------------------
-
-Java 8 introduces several new language features designed for faster and clearer coding. With the most important feature, 
-the so-called "Lambda Expressions", Java 8 opens the door to functional programming. Lambda Expressions allow for implementing and 
-passing functions in a straightforward way without having to declare additional (anonymous) classes.
-
-The newest version of Flink supports the usage of Lambda Expressions for all operators of the Java API.
-This document shows how to use Lambda Expressions and describes current limitations. For a general introduction to the
-Flink Java API, please refer to the [API guide](java_api_guide.html)
-
-
-### Examples
-
-The following example illustrates how to implement a simple, inline `map()` function that squares its input using a Lambda Expression. 
-The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java 8 compiler.
-
-```java
-env.fromElements(1, 2, 3)
-// returns the squared i
-.map(i -> i*i)
-.print();
-```
-
-The next two example show different implementations of a function that uses a `Collector` for output. 
-Functions, such as `flatMap()`, require a output type (in this case `String`) to be defined for the `Collector` in order to be type-safe. 
-If the `Collector` type can not be inferred from the surrounding context, it need to be declared in the Lambda Expression's parameter list manually. 
-Otherwise the output will be treated as type `Object` which can lead to undesired behaviour.
-
-```java
-DataSet<String> input = env.fromElements(1, 2, 3);
-
-// collector type must be declared
-input.flatMap((Integer number, Collector<String> out) -> {
-    for(int i = 0; i < number; i++) {
-        out.collect("a");
-    }
-})
-// returns "a", "a", "aa", "a", "aa" , "aaa"
-.print();
-```
-
-```java
-DataSet<String> input = env.fromElements(1, 2, 3);
-
-// collector type must not be declared, it is inferred from the type of the dataset
-DataSet<String> manyALetters = input.flatMap((number, out) -> {	
-    for(int i = 0; i < number; i++) {
-        o.collect("a");
-    }
-});
-
-// returns "a", "a", "aa", "a", "aa" , "aaa"
-manyALetters.print();
-```
-
-The following code demonstrates a word count which makes extensive use of Lambda Expressions.
-
-```java
-DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
-		
-// filter out strings that contain "not"
-input.filter(line -> !line.contains("not"))
-// split each line by space
-.map(line -> line.split(" "))
-// emit a pair <word,1> for each array element
-.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out) 
-    -> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
-    )
-// group and sum up
-.groupBy(0).sum(1)
-// print
-.print();
-```
-
-### Compiler Limitations
-Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler**. 
-
-Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely. 
-Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all generic parameters related to Lambda Expressions. This means that types such as `Tuple2<String,Integer` or `Collector<String>` declared as a Lambda function input or output parameter will be pruned to `Tuple2` or `Collector` in the compiled `.class` files, which is too little information for the Flink Compiler. 
-
-How to compile a Flink job that contains Lambda Expressions with the JDT compiler will be covered in the next section. 
-
-However, it is possible to implement functions such as `map()` or `filter()` with Lambda Expressions in Java 8 compilers other than the Eclipse JDT compiler as long as the function has no `Collector`s or `Iterable`s *and* only if the function handles unparameterized types such as `Integer`, `Long`, `String`, `MyOwnClass` (types without Generics!).
-
-#### Compile Flink jobs with the Eclipse JDT compiler
-
-If you are using the Eclipse IDE, you can run and debug your Flink code within the IDE without any problems. The Eclipse IDE by default compiles its Java sources with the Eclipse JDT compiler. 
-
-If you are using a different IDE such as IntelliJ IDEA or you want to package your Jar-File with Maven to run your job on a cluster, you need to modify your project's `pom.xml` file and build your program with Maven. The [quickstart](setup_quickstart.html) contains preconfigured Maven projects which can be used for new projects or as a reference. Uncomment the mentioned lines in your generated quickstart `pom.xml` file if you want to use Java 8 with Lambda Expressions. 
-
-Alternatively, you can manually insert the following lines to your Maven `pom.xml` file. Maven will then use the Eclipse JDT compiler for compilation.
-
-```xml
-<!-- put these lines under "project/build/pluginManagement/plugins" of your pom.xml -->
-
-<plugin>
-    <!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
-    <artifactId>maven-compiler-plugin</artifactId>
-    <configuration>
-        <source>1.8</source>
-        <target>1.8</target>
-        <compilerId>jdt</compilerId>
-    </configuration>
-    <dependencies>
-        <!-- This dependency provides the implementation of compiler "jdt": -->
-        <dependency>
-            <groupId>org.eclipse.tycho</groupId>
-            <artifactId>tycho-compiler-jdt</artifactId>
-            <version>0.21.0</version>
-        </dependency>
-    </dependencies>
-</plugin>
-```
-
-If you are using Eclipse for development, the m2e plugin might complain about the inserted lines above and marks your `pom.xml` as invalid. If so, insert the following lines to your `pom.xml`.
-
-```xml
-<!-- put these lines under "project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions" of your pom.xml -->
-
-<pluginExecution>
-    <pluginExecutionFilter>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <versionRange>[3.1,)</versionRange>
-        <goals>
-            <goal>testCompile</goal>
-        </goals>
-    </pluginExecutionFilter>
-    <action>
-        <ignore></ignore>
-    </action>
-</pluginExecution>
-```
-
-
-
-[Back to top](#top)


[4/4] git commit: [FLINK-1062] Type Extraction for Lambdas

Posted by al...@apache.org.
[FLINK-1062] Type Extraction for Lambdas

Checking for Java support

Small changes and meaningful exception for unsupported compilers

pom.xml modified to use JDT compiler

Project structure refactored, examples added

Documentation added

Quickstart adapted


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/849e398a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/849e398a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/849e398a

Branch: refs/heads/master
Commit: 849e398abfcc12ea207fde6e470eb280d39edd8a
Parents: 4a44899
Author: twalthr <in...@twalthr.com>
Authored: Wed Sep 3 16:54:53 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Sep 26 10:00:46 2014 +0200

----------------------------------------------------------------------
 docs/java_api_java8.md                          | 143 +++++++++++
 .../common/functions/util/FunctionUtils.java    |  84 +++++--
 .../java/org/apache/flink/api/java/DataSet.java |  21 +-
 .../UnsupportedLambdaExpressionException.java   |  30 ---
 .../api/java/operators/CoGroupOperator.java     |   5 -
 .../flink/api/java/operators/CrossOperator.java |   5 -
 .../flink/api/java/operators/JoinOperator.java  |  10 +-
 .../api/java/operators/PartitionedDataSet.java  |   8 -
 .../api/java/operators/SortedGrouping.java      |  10 +-
 .../api/java/operators/UnsortedGrouping.java    |   9 +-
 .../flink/api/java/typeutils/TypeExtractor.java | 202 ++++++++++------
 .../java/type/extractor/TypeExtractorTest.java  |   2 +-
 flink-java8-tests/pom.xml                       | 140 -----------
 .../javaApiOperators/lambdas/CoGroupITCase.java |  68 ------
 .../javaApiOperators/lambdas/CrossITCase.java   |  60 -----
 .../javaApiOperators/lambdas/FilterITCase.java  |  94 --------
 .../lambdas/FlatJoinITCase.java                 |  59 -----
 .../javaApiOperators/lambdas/FlatMapITCase.java |  47 ----
 .../lambdas/GroupReduceITCase.java              |  83 -------
 .../javaApiOperators/lambdas/JoinITCase.java    |  59 -----
 .../lambdas/KeySelectorTest.java                |  52 ----
 .../lambdas/LambdaExtractionTest.java           |  82 -------
 .../javaApiOperators/lambdas/MapITCase.java     |  49 ----
 .../javaApiOperators/lambdas/ReduceITCase.java  | 114 ---------
 flink-java8/pom.xml                             | 164 +++++++++++++
 .../example/java8/relational/TPCHQuery10.java   | 221 +++++++++++++++++
 .../example/java8/wordcount/WordCount.java      | 127 ++++++++++
 .../java8/wordcount/util/WordCountData.java     |  71 ++++++
 .../java/type/lambdas/LambdaExtractionTest.java | 240 +++++++++++++++++++
 .../lambdas/AllGroupReduceITCase.java           |  56 +++++
 .../javaApiOperators/lambdas/CoGroupITCase.java |  71 ++++++
 .../javaApiOperators/lambdas/CrossITCase.java   |  70 ++++++
 .../javaApiOperators/lambdas/FilterITCase.java  |  88 +++++++
 .../lambdas/FlatJoinITCase.java                 |  65 +++++
 .../javaApiOperators/lambdas/FlatMapITCase.java |  53 ++++
 .../lambdas/GroupReduceITCase.java              |  66 +++++
 .../javaApiOperators/lambdas/JoinITCase.java    |  66 +++++
 .../javaApiOperators/lambdas/MapITCase.java     |  53 ++++
 .../javaApiOperators/lambdas/ReduceITCase.java  | 106 ++++++++
 flink-quickstart/README.md                      |   4 +
 .../main/resources/archetype-resources/pom.xml  |  65 ++++-
 .../archetype-resources/src/main/java/Job.java  |   2 +-
 pom.xml                                         |   2 +-
 43 files changed, 1931 insertions(+), 1095 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/docs/java_api_java8.md
----------------------------------------------------------------------
diff --git a/docs/java_api_java8.md b/docs/java_api_java8.md
new file mode 100644
index 0000000..75e6973
--- /dev/null
+++ b/docs/java_api_java8.md
@@ -0,0 +1,143 @@
+---
+title: "Java API with Java 8"
+---
+
+<section id="top">
+Java API with Java 8
+-----------------------
+
+Java 8 introduces several new language features designed for faster and clearer coding. With the most important feature, 
+the so-called "Lambda Expressions", Java 8 opens the door to functional programming. Lambda Expressions allow for implementing and 
+passing functions in a straightforward way without having to declare additional (anonymous) classes.
+
+The newest version of Flink supports the usage of Lambda Expressions for all operators of the Java API.
+This document shows how to use Lambda Expressions and describes current limitations. For a general introduction to the
+Flink Java API, please refer to the [API guide](java_api_guide.html)
+
+
+### Examples
+
+The following example illustrates how to implement a simple, inline `map()` function that squares its input using a Lambda Expression. 
+The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java 8 compiler.
+
+```java
+env.fromElements(1, 2, 3)
+// returns the squared i
+.map(i -> i*i)
+.print();
+```
+
+The next two example show different implementations of a function that uses a `Collector` for output. 
+Functions, such as `flatMap()`, require a output type (in this case `String`) to be defined for the `Collector` in order to be type-safe. 
+If the `Collector` type can not be inferred from the surrounding context, it need to be declared in the Lambda Expression's parameter list manually. 
+Otherwise the output will be treated as type `Object` which can lead to undesired behaviour.
+
+```java
+DataSet<String> input = env.fromElements(1, 2, 3);
+
+// collector type must be declared
+input.flatMap((Integer number, Collector<String> out) -> {
+    for(int i = 0; i < number; i++) {
+        out.collect("a");
+    }
+})
+// returns "a", "a", "aa", "a", "aa" , "aaa"
+.print();
+```
+
+```java
+DataSet<String> input = env.fromElements(1, 2, 3);
+
+// collector type must not be declared, it is inferred from the type of the dataset
+DataSet<String> manyALetters = input.flatMap((number, out) -> {	
+    for(int i = 0; i < number; i++) {
+        o.collect("a");
+    }
+});
+
+// returns "a", "a", "aa", "a", "aa" , "aaa"
+manyALetters.print();
+```
+
+The following code demonstrates a word count which makes extensive use of Lambda Expressions.
+
+```java
+DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
+		
+// filter out strings that contain "not"
+input.filter(line -> !line.contains("not"))
+// split each line by space
+.map(line -> line.split(" "))
+// emit a pair <word,1> for each array element
+.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out) 
+    -> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
+    )
+// group and sum up
+.groupBy(0).sum(1)
+// print
+.print();
+```
+
+### Compiler Limitations
+Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler**. 
+
+Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely. 
+Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all generic parameters related to Lambda Expressions. This means that types such as `Tuple2<String,Integer` or `Collector<String>` declared as a Lambda function input or output parameter will be pruned to `Tuple2` or `Collector` in the compiled `.class` files, which is too little information for the Flink Compiler. 
+
+How to compile a Flink job that contains Lambda Expressions with the JDT compiler will be covered in the next section. 
+
+However, it is possible to implement functions such as `map()` or `filter()` with Lambda Expressions in Java 8 compilers other than the Eclipse JDT compiler as long as the function has no `Collector`s or `Iterable`s *and* only if the function handles unparameterized types such as `Integer`, `Long`, `String`, `MyOwnClass` (types without Generics!).
+
+#### Compile Flink jobs with the Eclipse JDT compiler
+
+If you are using the Eclipse IDE, you can run and debug your Flink code within the IDE without any problems. The Eclipse IDE by default compiles its Java sources with the Eclipse JDT compiler. 
+
+If you are using a different IDE such as IntelliJ IDEA or you want to package your Jar-File with Maven to run your job on a cluster, you need to modify your project's `pom.xml` file and build your program with Maven. The [quickstart](setup_quickstart.html) contains preconfigured Maven projects which can be used for new projects or as a reference. Uncomment the mentioned lines in your generated quickstart `pom.xml` file if you want to use Java 8 with Lambda Expressions. 
+
+Alternatively, you can manually insert the following lines to your Maven `pom.xml` file. Maven will then use the Eclipse JDT compiler for compilation.
+
+```xml
+<!-- put these lines under "project/build/pluginManagement/plugins" of your pom.xml -->
+
+<plugin>
+    <!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
+    <artifactId>maven-compiler-plugin</artifactId>
+    <configuration>
+        <source>1.8</source>
+        <target>1.8</target>
+        <compilerId>jdt</compilerId>
+    </configuration>
+    <dependencies>
+        <!-- This dependency provides the implementation of compiler "jdt": -->
+        <dependency>
+            <groupId>org.eclipse.tycho</groupId>
+            <artifactId>tycho-compiler-jdt</artifactId>
+            <version>0.21.0</version>
+        </dependency>
+    </dependencies>
+</plugin>
+```
+
+If you are using Eclipse for development, the m2e plugin might complain about the inserted lines above and marks your `pom.xml` as invalid. If so, insert the following lines to your `pom.xml`.
+
+```xml
+<!-- put these lines under "project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions" of your pom.xml -->
+
+<pluginExecution>
+    <pluginExecutionFilter>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <versionRange>[3.1,)</versionRange>
+        <goals>
+            <goal>testCompile</goal>
+        </goals>
+    </pluginExecutionFilter>
+    <action>
+        <ignore></ignore>
+    </action>
+</pluginExecution>
+```
+
+
+
+[Back to top](#top)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index 042a1e8..8ab2184 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.common.functions.util;
 
+import java.lang.reflect.Method;
+
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 
-import java.lang.reflect.Method;
-
 public class FunctionUtils {
 
 	public static void openFunction (Function function, Configuration parameters) throws Exception{
@@ -57,31 +57,71 @@ public class FunctionUtils {
 			return defaultContext;
 		}
 	}
+	
+	public static Method checkAndExtractLambdaMethod(Function function) {
+		try {
+			// get serialized lambda
+			Object serializedLambda = null;
+			for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
+				try {
+					Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
+					replaceMethod.setAccessible(true);
+					Object serialVersion = replaceMethod.invoke(function);
 
-	public static boolean isLambdaFunction(Function function) {
-		if (function == null) {
-			throw new IllegalArgumentException();
-		}
-		
-		for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
-			try {
-				Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
-				replaceMethod.setAccessible(true);
-				Object serialVersion = replaceMethod.invoke(function);
-				
-				if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) {
-					return true;
+					// check if class is a lambda function
+					if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) {
+
+						// check if SerializedLambda class is present
+						try {
+							Class.forName("java.lang.invoke.SerializedLambda");
+						}
+						catch (Exception e) {
+							throw new UnsupportedOperationException("User code tries to use lambdas, but framework is running with a Java version < 8");
+						}
+						serializedLambda = serialVersion;
+						break;
+					}
+				}
+				catch (NoSuchMethodException e) {
+					// thrown if the method is not there. fall through the loop
 				}
 			}
-			catch (NoSuchMethodException e) {
-				// thrown if the method is not there. fall through the loop
+
+			// not a lambda method -> return null
+			if (serializedLambda == null) {
+				return null;
 			}
-			catch (Throwable t) {
-				// this should not happen, we are not executing any method code.
-				throw new RuntimeException("Error while checking whether function is a lambda.", t);
+
+			// find lambda method
+			Method implClassMethod = serializedLambda.getClass().getDeclaredMethod("getImplClass");
+			Method implMethodNameMethod = serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
+
+			String className = (String) implClassMethod.invoke(serializedLambda);
+			String methodName = (String) implMethodNameMethod.invoke(serializedLambda);
+
+			Class<?> implClass = Class.forName(className.replace('/', '.'));
+
+			Method[] methods = implClass.getDeclaredMethods();
+			Method parameterizedMethod = null;
+			for(Method method : methods) {
+				if(method.getName().equals(methodName)) {
+					if(parameterizedMethod != null) {
+						// It is very unlikely that a class contains multiple e.g. "lambda$2()" but its possible
+						// Actually, the signature need to be checked, but this is very complex
+						throw new Exception("Lambda method name is not unique.");
+					}
+					else {
+						parameterizedMethod = method;
+					}
+				}
+			}
+			if(parameterizedMethod == null) {
+				throw new Exception("No lambda method found.");
 			}
+			return parameterizedMethod;
+		}
+		catch(Exception e) {
+			throw new RuntimeException("Could not extract lambda method out of function.", e);
 		}
-		
-		return false;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 7a13f2f..76363bd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -20,13 +20,13 @@ package org.apache.flink.api.java;
 
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -36,7 +36,6 @@ import org.apache.flink.api.java.functions.FormattingMapper;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SelectByMaxFunction;
 import org.apache.flink.api.java.functions.SelectByMinFunction;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
@@ -52,6 +51,7 @@ import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.operators.FilterOperator;
 import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
 import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
@@ -60,7 +60,6 @@ import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.PartitionedDataSet;
 import org.apache.flink.api.java.operators.ProjectOperator.Projection;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.ReduceOperator;
 import org.apache.flink.api.java.operators.SortedGrouping;
 import org.apache.flink.api.java.operators.UnionOperator;
@@ -69,11 +68,9 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
 
 /**
  * A DataSet represents a collection of elements of the same type.<br/>
@@ -149,9 +146,6 @@ public abstract class DataSet<T> {
 		if (mapper == null) {
 			throw new NullPointerException("Map function must not be null.");
 		}
-		if (FunctionUtils.isLambdaFunction(mapper)) {
-			throw new UnsupportedLambdaExpressionException();
-		}
 
 		TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType());
 
@@ -202,9 +196,7 @@ public abstract class DataSet<T> {
 		if (flatMapper == null) {
 			throw new NullPointerException("FlatMap function must not be null.");
 		}
-		if (FunctionUtils.isLambdaFunction(flatMapper)) {
-			throw new UnsupportedLambdaExpressionException();
-		}
+
 		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType());
 		return new FlatMapOperator<T, R>(this, resultType, flatMapper);
 	}
@@ -349,9 +341,6 @@ public abstract class DataSet<T> {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		if (FunctionUtils.isLambdaFunction(reducer)) {
-			throw new UnsupportedLambdaExpressionException();
-		}
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType());
 		return new GroupReduceOperator<T, R>(this, resultType, reducer);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java
deleted file mode 100644
index e9a8a0a..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.InvalidProgramException;
-
-public class UnsupportedLambdaExpressionException extends InvalidProgramException {
-
-	private static final long serialVersionUID = -1721898801986321010L;
-
-	public UnsupportedLambdaExpressionException() {
-		super("Java 8 lambda expressions are currently supported only in filter and reduce user-defined functions.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 555c15f..5063af7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -23,7 +23,6 @@ import java.security.InvalidParameterException;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -33,7 +32,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.operators.Keys.FieldPositionKeys;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingCoGroupOperator;
@@ -519,9 +517,6 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					if (function == null) {
 						throw new NullPointerException("CoGroup function must not be null.");
 					}
-					if (FunctionUtils.isLambdaFunction(function)) {
-						throw new UnsupportedLambdaExpressionException();
-					}
 					TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());
 					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType);
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index b21bffd..c4cc759 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
@@ -134,9 +132,6 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			if (function == null) {
 				throw new NullPointerException("Cross function must not be null.");
 			}
-			if (FunctionUtils.isLambdaFunction(function)) {
-				throw new UnsupportedLambdaExpressionException();
-			}
 			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
 			return new CrossOperator<I1, I2, R>(input1, input2, function, returnType);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index f1622b4..f796084 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
@@ -38,7 +37,6 @@ import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.operators.Keys.FieldPositionKeys;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingJoinOperator;
@@ -501,10 +499,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			if (function == null) {
 				throw new NullPointerException("Join function must not be null.");
 			}
-			if (FunctionUtils.isLambdaFunction(function)) {
-				throw new UnsupportedLambdaExpressionException();
-			}
-			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
+			TypeInformation<R> returnType = TypeExtractor.getFlatJoinReturnTypes(function, getInput1Type(), getInput2Type());
 			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
 		}
 
@@ -512,9 +507,6 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			if (function == null) {
 				throw new NullPointerException("Join function must not be null.");
 			}
-			if (FunctionUtils.isLambdaFunction(function)) {
-				throw new UnsupportedLambdaExpressionException();
-			}
 			FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
 			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
 			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
index 0ab984d..a30be20 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -86,9 +84,6 @@ public class PartitionedDataSet<IN> {
 		if (mapper == null) {
 			throw new NullPointerException("Map function must not be null.");
 		}
-		if (FunctionUtils.isLambdaFunction(mapper)) {
-			throw new UnsupportedLambdaExpressionException();
-		}
 		
 		final TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, dataSet.getType());
 		
@@ -139,9 +134,6 @@ public class PartitionedDataSet<IN> {
 		if (flatMapper == null) {
 			throw new NullPointerException("FlatMap function must not be null.");
 		}
-		if (FunctionUtils.isLambdaFunction(flatMapper)) {
-			throw new UnsupportedLambdaExpressionException();
-		}
 		
 		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, dataSet.getType());
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index a0bb920..78fd48b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,16 +18,14 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.FirstReducer;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 
@@ -85,10 +83,8 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		if (FunctionUtils.isLambdaFunction(reducer)) {
-			throw new UnsupportedLambdaExpressionException();
-		}
-		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
+		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer,
+				this.getDataSet().getType());
 		return new GroupReduceOperator<T, R>(this, resultType, reducer);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 13720ac..71b1828 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -21,15 +21,13 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FirstReducer;
 import org.apache.flink.api.java.functions.SelectByMaxFunction;
 import org.apache.flink.api.java.functions.SelectByMinFunction;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
@@ -133,9 +131,6 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		if (FunctionUtils.isLambdaFunction(reducer)) {
-			throw new UnsupportedLambdaExpressionException();
-		}
 		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
 
 		return new GroupReduceOperator<T, R>(this, resultType, reducer);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 6177ef5..d30d152 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -47,9 +48,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
 import org.apache.hadoop.io.Writable;
 
 public class TypeExtractor {
@@ -66,98 +67,45 @@ public class TypeExtractor {
 	//  Function specific methods
 	// --------------------------------------------------------------------------------------------
 	
-	@SuppressWarnings("unchecked")
 	public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
-		validateInputType(MapFunction.class, mapInterface.getClass(), 0, inType);
-		if(mapInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) mapInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(MapFunction.class, mapInterface.getClass(), 1, inType, null);
+		return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType);
 	}
 	
-	@SuppressWarnings("unchecked")
 	public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
-		validateInputType(FlatMapFunction.class, flatMapInterface.getClass(), 0, inType);
-		if(flatMapInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) flatMapInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(FlatMapFunction.class, flatMapInterface.getClass(), 1, inType, null);
+		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType);
 	}
 	
-	@SuppressWarnings("unchecked")
-	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
-		validateInputType(MapPartitionFunction.class, mapInterface.getClass(), 0, inType);
-		if(mapInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) mapInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(MapPartitionFunction.class, mapInterface.getClass(), 1, inType, null);
+	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType) {
+		return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType);
 	}
 	
-	@SuppressWarnings("unchecked")
 	public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface,
 			TypeInformation<IN> inType) {
-		validateInputType(GroupReduceFunction.class, groupReduceInterface.getClass(), 0, inType);
-		if(groupReduceInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) groupReduceInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(GroupReduceFunction.class, groupReduceInterface.getClass(), 1, inType, null);
+		return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType);
 	}
 	
-	@SuppressWarnings("unchecked")
-	public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		validateInputType(FlatJoinFunction.class, joinInterface.getClass(), 0, in1Type);
-		validateInputType(FlatJoinFunction.class, joinInterface.getClass(), 1, in2Type);
-		if(joinInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) joinInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(FlatJoinFunction.class, joinInterface.getClass(), 2, in1Type, in2Type);
+		return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true, in1Type, in2Type);
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		validateInputType(JoinFunction.class, joinInterface.getClass(), 0, in1Type);
-		validateInputType(JoinFunction.class, joinInterface.getClass(), 1, in2Type);
-		if(joinInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) joinInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(JoinFunction.class, joinInterface.getClass(), 2, in1Type, in2Type);
+		return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false, in1Type, in2Type);
 	}
 	
-	@SuppressWarnings("unchecked")
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		validateInputType(CoGroupFunction.class, coGroupInterface.getClass(), 0, in1Type);
-		validateInputType(CoGroupFunction.class, coGroupInterface.getClass(), 1, in2Type);
-		if(coGroupInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) coGroupInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(CoGroupFunction.class, coGroupInterface.getClass(), 2, in1Type, in2Type);
+		return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true, in1Type, in2Type);
 	}
 	
-	@SuppressWarnings("unchecked")
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		validateInputType(CrossFunction.class, crossInterface.getClass(), 0, in1Type);
-		validateInputType(CrossFunction.class, crossInterface.getClass(), 1, in2Type);
-		if(crossInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) crossInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(CrossFunction.class, crossInterface.getClass(), 2, in1Type, in2Type);
+		return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, in1Type, in2Type);
 	}
 	
-	@SuppressWarnings("unchecked")
 	public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> inType) {
-		if (FunctionUtils.isLambdaFunction(selectorInterface)) {
-			throw new UnsupportedLambdaExpressionException();
-		}
-		
-		validateInputType(KeySelector.class, selectorInterface.getClass(), 0, inType);
-		if(selectorInterface instanceof ResultTypeQueryable) {
-			return ((ResultTypeQueryable<OUT>) selectorInterface).getProducedType();
-		}
-		return new TypeExtractor().privateCreateTypeInfo(KeySelector.class, selectorInterface.getClass(), 1, inType, null);
+		return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType);
 	}
 	
 	@SuppressWarnings("unchecked")
@@ -168,6 +116,59 @@ public class TypeExtractor {
 		return new TypeExtractor().privateCreateTypeInfo(InputFormat.class, inputFormatInterface.getClass(), 0, null, null);
 	}
 	
+	@SuppressWarnings("unchecked")
+	private static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass, boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType) {
+		final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
+		if (m != null) {
+			// check for lambda type erasure
+			validateLambdaGenericParameters(m);
+			
+			// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
+			final int paramLen = m.getGenericParameterTypes().length - 1;
+			final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
+			validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType);
+			if(function instanceof ResultTypeQueryable) {
+				return ((ResultTypeQueryable<OUT>) function).getProducedType();
+			}
+			return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null);
+		}
+		else {
+			validateInputType(baseClass, function.getClass(), 0, inType);
+			if(function instanceof ResultTypeQueryable) {
+				return ((ResultTypeQueryable<OUT>) function).getProducedType();
+			}
+			return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null);
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	private static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass, boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+		final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
+		if (m != null) {
+			// check for lambda type erasure
+			validateLambdaGenericParameters(m);
+			
+			// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
+			final int paramLen = m.getGenericParameterTypes().length - 1;
+			final Type input1 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1];
+			final Type input2 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
+			validateInputType((hasIterables)? removeGenericWrapper(input1) : input1, in1Type);
+			validateInputType((hasIterables)? removeGenericWrapper(input2) : input2, in2Type);
+			if(function instanceof ResultTypeQueryable) {
+				return ((ResultTypeQueryable<OUT>) function).getProducedType();
+			}
+			return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), in1Type, in2Type);
+		}
+		else {
+			validateInputType(baseClass, function.getClass(), 0, in1Type);
+			validateInputType(baseClass, function.getClass(), 1, in2Type);
+			if(function instanceof ResultTypeQueryable) {
+				return ((ResultTypeQueryable<OUT>) function).getProducedType();
+			}
+			return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type);
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Create type information
 	// --------------------------------------------------------------------------------------------
@@ -176,17 +177,20 @@ public class TypeExtractor {
 		return new TypeExtractor().privateCreateTypeInfo(t);
 	}
 	
+	public static <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+		return new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);
+	}
+	
+	// ----------------------------------- private methods ----------------------------------------
+	
 	private TypeInformation<?> privateCreateTypeInfo(Type t) {
 		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
 		typeHierarchy.add(t);
 		return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null);
 	}
 	
-	public static <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
-			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		return new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);
-	}
-	
+	// for (Rich)Functions
 	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
@@ -208,6 +212,15 @@ public class TypeExtractor {
 		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
 	
+	// for LambdaFunctions
+	@SuppressWarnings("unchecked")
+	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		
+		// get info from hierarchy
+		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
+	}
+	
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
@@ -458,8 +471,18 @@ public class TypeExtractor {
 	//  Validate input
 	// --------------------------------------------------------------------------------------------
 	
+	private static void validateInputType(Type t, TypeInformation<?> inType) {
+		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		try {
+			validateInfo(typeHierarchy, t, inType);
+		}
+		catch(InvalidTypesException e) {
+			throw new InvalidTypesException("Input mismatch: " + e.getMessage());
+		}
+	}
+	
 	private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inType) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();		
+		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
 		try {
 			validateInfo(typeHierarchy, getParameterType(baseClass, typeHierarchy, clazz, inputParamPos), inType);
 		}
@@ -644,7 +667,40 @@ public class TypeExtractor {
 	
 	// --------------------------------------------------------------------------------------------
 	//  Utility methods
-	// --------------------------------------------------------------------------------------------	
+	// --------------------------------------------------------------------------------------------
+	
+	private static Type removeGenericWrapper(Type t) {
+		if(t instanceof ParameterizedType 	&& 
+				(Collector.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType())
+						|| Iterable.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType()))) {
+			return ((ParameterizedType) t).getActualTypeArguments()[0];
+		}
+		return t;
+	}
+	
+	private static void validateLambdaGenericParameters(Method m) {
+		// check the arguments
+		for (Type t : m.getGenericParameterTypes()) {
+			validateLambdaGenericParameter(t);
+		}
+
+		// check the return type
+		validateLambdaGenericParameter(m.getGenericReturnType());
+	}
+
+	private static void validateLambdaGenericParameter(Type t) {
+		if(!(t instanceof Class)) {
+			return;
+		}
+		final Class<?> clazz = (Class<?>) t;
+
+		if(clazz.getTypeParameters().length > 0) {
+			throw new InvalidTypesException("The generic type parameters of '" + clazz.getSimpleName() + "' are missing. \n"
+					+ "It seems that your compiler has not stored them into the .class file. \n"
+					+ "Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. \n"
+					+ "See the documentation for more information about how to compile jobs containing lambda expressions.");
+		}
+	}
 	
 	private static String encodePrimitiveClass(Class<?> primitiveClass) {
 		final String name = primitiveClass.getName();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index 823c340..8fb56b8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -253,7 +253,7 @@ public class TypeExtractorTest {
 			}			
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), (TypeInformation) TypeInfoParser.parse("String"));
+		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), (TypeInformation) TypeInfoParser.parse("String"));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8-tests/pom.xml b/flink-java8-tests/pom.xml
deleted file mode 100644
index 8615251..0000000
--- a/flink-java8-tests/pom.xml
+++ /dev/null
@@ -1,140 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>0.7-incubating-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-java8-tests</artifactId>
-	<name>flink-java8-tests</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>4.7</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<!-- just define the Java version to be used for compiling and plugins -->
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version><!--$NO-MVN-MAN-VER$-->
-				<configuration>
-					<source>1.8</source>
-					<target>1.8</target>
-					<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<systemPropertyVariables>
-						<log.level>WARN</log.level>
-					</systemPropertyVariables>
-				</configuration>
-			</plugin>
-			<plugin>
-				<artifactId>maven-failsafe-plugin</artifactId>
-				<configuration>
-					<systemPropertyVariables>
-						<log.level>WARN</log.level>
-					</systemPropertyVariables>
-				</configuration>
-			</plugin>
-		</plugins>
-		
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>
-											org.apache.maven.plugins
-										</groupId>
-										<artifactId>
-											maven-assembly-plugin
-										</artifactId>
-										<versionRange>
-											[2.4,)
-										</versionRange>
-										<goals>
-											<goal>single</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore></ignore>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
deleted file mode 100644
index d5602c6..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class CoGroupITCase implements Serializable {
-
-	@Test
-	public void testCoGroupLambda() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<Tuple2<Integer, String>> left = env.fromElements(
-					new Tuple2<Integer, String>(1, "hello"),
-					new Tuple2<Integer, String>(2, "what's"),
-					new Tuple2<Integer, String>(2, "up")
-			);
-			DataSet<Tuple2<Integer, String>> right = env.fromElements(
-					new Tuple2<Integer, String>(1, "not"),
-					new Tuple2<Integer, String>(1, "much"),
-					new Tuple2<Integer, String>(2, "really")
-			);
-			DataSet<Tuple2<Integer,String>> joined = left.coGroup(right).where(0).equalTo(0)
-					.with((values1, values2, out) -> {
-						int sum = 0;
-						for (Tuple2<Integer, String> next : values1) {
-							sum += next.f0;
-						}
-						for (Tuple2<Integer, String> next : values2) {
-							sum += next.f0;
-						}
-					});
-			env.execute();
-
-
-		} catch (UnsupportedLambdaExpressionException e) {
-			// Success
-			return;
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
deleted file mode 100644
index a3dee07..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class CrossITCase implements Serializable {
-
-	@Test
-	public void testCrossLambda() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<Tuple2<Integer, String>> left = env.fromElements(
-					new Tuple2<Integer, String>(1, "hello"),
-					new Tuple2<Integer, String>(2, "what's"),
-					new Tuple2<Integer, String>(2, "up")
-			);
-			DataSet<Tuple2<Integer, String>> right = env.fromElements(
-					new Tuple2<Integer, String>(1, "not"),
-					new Tuple2<Integer, String>(1, "much"),
-					new Tuple2<Integer, String>(2, "really")
-			);
-			DataSet<Tuple2<Integer,String>> joined = left.cross(right)
-					.with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1));
-
-		} catch (UnsupportedLambdaExpressionException e) {
-			// Success
-			return;
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
deleted file mode 100644
index c600156..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-@SuppressWarnings("serial")
-public class FilterITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
-													"4,3,Hello world, how are you?\n";
-
-	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
-		data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
-		data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
-		data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
-		data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
-		data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
-		data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
-		data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
-		data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
-		data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
-		data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
-		data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
-		data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
-		data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
-		data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
-		data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
-		data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
-		data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
-		data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
-		data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
-		data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
-		data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(value -> value.f2.contains("world"));
-		filterDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
deleted file mode 100644
index b201216..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class FlatJoinITCase implements Serializable {
-
-	@Test
-	public void testFlatJoinLambda() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<Tuple2<Integer, String>> left = env.fromElements(
-					new Tuple2<Integer, String>(1, "hello"),
-					new Tuple2<Integer, String>(2, "what's"),
-					new Tuple2<Integer, String>(2, "up")
-			);
-			DataSet<Tuple2<Integer, String>> right = env.fromElements(
-					new Tuple2<Integer, String>(1, "not"),
-					new Tuple2<Integer, String>(1, "much"),
-					new Tuple2<Integer, String>(2, "really")
-			);
-			DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
-					.with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
-		} catch (UnsupportedLambdaExpressionException e) {
-			// Success
-			return;
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
deleted file mode 100644
index e052914..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class FlatMapITCase implements Serializable {
-
-	@Test
-	public void testFlatMapLambda() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-			DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
-			env.execute();
-		} catch (UnsupportedLambdaExpressionException e) {
-			// Success
-			return;
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
deleted file mode 100644
index 123fb16..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class GroupReduceITCase implements Serializable {
-
-	@Test
-	public void testAllGroupReduceLambda() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-			DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
-				String conc = "";
-				for (String s : values) {
-					conc = conc.concat(s);
-				}
-				out.collect(conc);
-			});
-			env.execute();
-		} catch (UnsupportedLambdaExpressionException e) {
-			// Success
-			return;
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-
-	@Test
-	public void testGroupReduceLambda() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<Tuple2<Integer,String>> stringDs = env.fromElements(
-					new Tuple2<Integer,String>(1, "aa"),
-					new Tuple2<Integer,String>(2, "ab"),
-					new Tuple2<Integer,String>(1, "ac"),
-					new Tuple2<Integer,String>(2, "ad")
-			);
-			DataSet<String> concatDs = stringDs
-					.groupBy(0)
-					.reduceGroup((values, out) -> {
-						String conc = "";
-						for (Tuple2<Integer,String> next : values) {
-							conc = conc.concat(next.f1);
-						}
-						out.collect(conc);
-					});
-			env.execute();
-		} catch (UnsupportedLambdaExpressionException e) {
-			// Success
-			return;
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
deleted file mode 100644
index b41ddee..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class JoinITCase implements Serializable {
-
-	@Test
-	public void testJoinLambda() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-			);
-			DataSet<Tuple2<Integer, String>> right = env.fromElements(
-					new Tuple2<Integer, String>(1, "not"),
-					new Tuple2<Integer, String>(1, "much"),
-					new Tuple2<Integer, String>(2, "really")
-			);
-			DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
-					.with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + t.f1));
-
-		} catch (UnsupportedLambdaExpressionException e) {
-			// Success
-			return;
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
deleted file mode 100644
index 6613d68..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/KeySelectorTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Assert;
-
-public class KeySelectorTest {
-
-	public void testSelectorLambda() {
-		try {
-			KeySelector<Tuple2<String, Integer>, String> selector = (t) -> t.f0;
-			
-			try {
-				TypeExtractor.getKeySelectorTypes(selector, 
-						new TupleTypeInfo<Tuple2<String, Integer>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
-				Assert.fail("No unsupported lambdas exception");
-			}
-			catch (UnsupportedLambdaExpressionException e) {
-				// good
-			}
-			catch (Exception e) {
-				Assert.fail("Wrong exception type");
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
deleted file mode 100644
index ec2ebf2..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class LambdaExtractionTest {
-
-	@Test
-	public void testIdentifyLambdas() {
-		try {
-			MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
-				@Override
-				public Integer map(String value) { return Integer.parseInt(value); }
-			};
-			
-			MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
-				@Override
-				public Integer map(String value) { return Integer.parseInt(value); }
-			};
-			
-			MapFunction<?, ?> fromProperClass = new StaticMapper();
-			
-			MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
-				@Override
-				public Tuple2<Integer, Long> map(Integer value) {
-					return new Tuple2<Integer, Long>(value, 1L);
-				}
-			};
-			
-			MapFunction<String, Integer> lambda = (str) -> Integer.parseInt(str);
-			
-			assertFalse(FunctionUtils.isLambdaFunction(anonymousFromInterface));
-			assertFalse(FunctionUtils.isLambdaFunction(anonymousFromClass));
-			assertFalse(FunctionUtils.isLambdaFunction(fromProperClass));
-			assertFalse(FunctionUtils.isLambdaFunction(fromDerived));
-			assertTrue(FunctionUtils.isLambdaFunction(lambda));
-			assertTrue(FunctionUtils.isLambdaFunction(STATIC_LAMBDA));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	public static class StaticMapper implements MapFunction<String, Integer> {
-
-		@Override
-		public Integer map(String value) { return Integer.parseInt(value); }
-	}
-	
-	public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
-
-		@Override
-		public Tuple2<T, Long> map(T value) throws Exception;
-	}
-	
-	private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) -> Integer.parseInt(str);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
deleted file mode 100644
index dd65e49..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class MapITCase implements Serializable{
-
-	@Test
-	public void TestMapLambda () {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-			DataSet<String> mappedDs = stringDs.map (s -> s.replace("a", "b"));
-			env.execute();
-		}
-		catch (UnsupportedLambdaExpressionException e) {
-			// Success
-			return;
-		}
-		catch (Exception e) {
-			Assert.fail();
-		}
-	}
-}


[2/4] git commit: Disable Checkstyle in java8 package

Posted by al...@apache.org.
Disable Checkstyle in java8 package

Checkstyle doesn't seem to support Java 8 yet.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a0c6ac90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a0c6ac90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a0c6ac90

Branch: refs/heads/master
Commit: a0c6ac905d24d4b02f79174dbf897ba97397b70b
Parents: 849e398
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 25 17:44:55 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Sep 26 10:00:46 2014 +0200

----------------------------------------------------------------------
 flink-java8/pom.xml | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0c6ac90/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index e733af5..d453f41 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -35,6 +35,10 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<properties>
+		<checkstyle.skip>true</checkstyle.skip>
+	</properties>
+
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>


[3/4] [FLINK-1062] Type Extraction for Lambdas

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
deleted file mode 100644
index b8344a9..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-@SuppressWarnings("serial")
-public class ReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
-			"2,3,2,Hallo Welt wie,1\n" +
-			"2,2,1,Hallo Welt,2\n" +
-			"3,9,0,P-),2\n" +
-			"3,6,5,BCD,3\n" +
-			"4,17,0,P-),1\n" +
-			"4,17,0,P-),2\n" +
-			"5,11,10,GHI,1\n" +
-			"5,29,0,P-),2\n" +
-			"5,25,0,P-),3\n";
-	
-	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(1,1l,0,"Hallo",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,5l,4,"ABC",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,6l,5,"BCD",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,7l,6,"CDE",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,8l,7,"DEF",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,9l,8,"EFG",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,10l,9,"FGH",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,11l,10,"GHI",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,12l,11,"HIJ",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,13l,12,"IJK",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,14l,13,"JKL",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,15l,14,"KLM",2l));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
-				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-	
-	private String resultPath;
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
-				.groupBy(4, 0)
-				.reduce((in1, in2) -> {
-					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
-					return out;
-				});
-
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
new file mode 100644
index 0000000..e733af5
--- /dev/null
+++ b/flink-java8/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.7-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-java8</artifactId>
+	<name>flink-java8</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.7</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<!-- just define the Java version to be used for compiling and plugins -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version><!--$NO-MVN-MAN-VER$-->
+				<configuration>
+					<source>1.8</source>
+					<target>1.8</target>
+					<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<systemPropertyVariables>
+						<log.level>WARN</log.level>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<systemPropertyVariables>
+						<log.level>WARN</log.level>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
+					<artifactId>maven-compiler-plugin</artifactId>
+					<configuration>
+						<source>1.8</source>
+						<target>1.8</target>
+						<compilerId>jdt</compilerId>
+					</configuration>
+					<dependencies>
+						<!-- This dependency provides the implementation of compiler "jdt": -->
+						<dependency>
+							<groupId>org.eclipse.tycho</groupId>
+							<artifactId>tycho-compiler-jdt</artifactId>
+							<version>0.21.0</version>
+						</dependency>
+					</dependencies>
+				</plugin>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-assembly-plugin</artifactId>
+										<versionRange>[2.4,)</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-compiler-plugin</artifactId>
+										<versionRange>[3.1,)</versionRange>
+										<goals>
+											<goal>testCompile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
new file mode 100644
index 0000000..9b67a43
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.relational;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+
+/**
+ * This program implements a modified version of the TPC-H query 10.
+ * The original query can be found at
+ * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
+ * 
+ * <p>
+ * This program implements the following SQL equivalent:
+ * 
+ * <p>
+ * <code><pre>
+ * SELECT 
+ *        c_custkey,
+ *        c_name, 
+ *        c_address,
+ *        n_name, 
+ *        c_acctbal
+ *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
+ * FROM   
+ *        customer, 
+ *        orders, 
+ *        lineitem, 
+ *        nation 
+ * WHERE 
+ *        c_custkey = o_custkey 
+ *        AND l_orderkey = o_orderkey 
+ *        AND YEAR(o_orderdate) > '1990' 
+ *        AND l_returnflag = 'R' 
+ *        AND c_nationkey = n_nationkey 
+ * GROUP BY 
+ *        c_custkey, 
+ *        c_name, 
+ *        c_acctbal, 
+ *        n_name, 
+ *        c_address
+ * </pre></code>
+ *        
+ * <p>
+ * Compared to the original TPC-H query this version does not print 
+ * c_phone and c_comment, only filters by years greater than 1990 instead of
+ * a period of 3 months, and does not sort the result by revenue.
+ * 
+ * <p>
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
+ * 
+ * <p>
+ * Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
+ *  
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> inline-defined functions using Java 8 Lambda Expressions
+ * </ul>
+ */
+public class TPCHQuery10 {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// get customer data set: (custkey, name, address, nationkey, acctbal) 
+		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
+
+		// get orders data set: (orderkey, custkey, orderdate)
+		DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
+
+		// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
+		DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
+
+		// get nation data set: (nationkey, name)
+		DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
+
+		// orders filtered by year: (orderkey, custkey)
+		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
+				// filter by year
+				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
+				// project fields out that are no longer required
+				.project(0,1).types(Integer.class, Integer.class);
+
+		// lineitems filtered by flag: (orderkey, extendedprice, discount)
+		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = 
+				// filter by flag
+				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
+				// project fields out that are no longer required
+				.project(0,1,2).types(Integer.class, Double.class, Double.class);
+
+		// join orders with lineitems: (custkey, extendedprice, discount)
+		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = 
+				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
+									.where(0).equalTo(0)
+									.projectFirst(1).projectSecond(1,2)
+									.types(Integer.class, Double.class, Double.class);
+
+		// aggregate for revenue: (custkey, revenue)
+		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
+				// calculate the revenue for each item
+				// revenue per item = l_extendedprice * (1 - l_discount)
+				.map(i -> new Tuple2<>(i.f0, i.f1 * (1 - i.f2)))
+				// aggregate the revenues per item to revenue per customer
+				.groupBy(0).sum(1);
+
+		// join customer with nation (custkey, name, address, nationname, acctbal)
+		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
+						.joinWithTiny(nations)
+						.where(3).equalTo(0)
+						.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
+						.types(Integer.class, String.class, String.class, String.class, Double.class);
+
+		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
+		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = 
+				customerWithNation.join(revenueOfCustomerKey)
+				.where(0).equalTo(0)
+				.projectFirst(0,1,2,3,4).projectSecond(1)
+				.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
+
+		// emit result
+		customerWithRevenue.writeAsCsv(outputPath);
+		
+		// execute program
+		env.execute("TPCH Query 10 Example");
+		
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static String customerPath;
+	private static String ordersPath;
+	private static String lineitemPath;
+	private static String nationPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] programArguments) {
+		
+		if(programArguments.length > 0) {
+			if(programArguments.length == 5) {
+				customerPath = programArguments[0];
+				ordersPath = programArguments[1];
+				lineitemPath = programArguments[2];
+				nationPath = programArguments[3];
+				outputPath = programArguments[4];
+			} else {
+				System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
+				return false;
+			}
+		} else {
+			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+								"  Due to legal restrictions, we can not ship generated data.\n" +
+								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
+								"  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
+			return false;
+		}
+		return true;
+	}
+	
+	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(customerPath)
+					.fieldDelimiter('|')
+					.includeFields("11110100")
+					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
+	}
+	
+	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(ordersPath)
+					.fieldDelimiter('|')
+					.includeFields("110010000")
+					.types(Integer.class, Integer.class, String.class);
+	}
+
+	private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(lineitemPath)
+					.fieldDelimiter('|')
+					.includeFields("1000011010000000")
+					.types(Integer.class, Double.class, Double.class, String.class);
+	}
+	
+	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(nationPath)
+					.fieldDelimiter('|')
+					.includeFields("1100")
+					.types(Integer.class, String.class);
+	}
+			
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
new file mode 100644
index 0000000..793962f
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.wordcount;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.example.java8.wordcount.util.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over text files. 
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a compact Flink program with Java 8 Lambda Expressions.
+ * </ul>
+ * 
+ */
+public class WordCount {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<String> text = getTextDataSet(env);
+		
+		DataSet<Tuple2<String, Integer>> counts = 
+				// normalize and split each line
+				text.map(line -> line.toLowerCase().split("\\W+"))
+				// convert splitted line in pairs (2-tuples) containing: (word,1)
+				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+					// emit the pairs with non-zero-length words
+					Arrays.stream(tokens)
+					.filter(t -> t.length() > 0)
+					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
+				})
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		if(fileOutput) {
+			counts.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			counts.print();
+		}
+		
+		// execute program
+		env.execute("WordCount Example");
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return WordCountData.getDefaultTextLineDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
new file mode 100644
index 0000000..9933696
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.wordcount.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Provides the default data sets used for the WordCount example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class WordCountData {
+
+	public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
+		
+		return env.fromElements(
+				"To be, or not to be,--that is the question:--",
+				"Whether 'tis nobler in the mind to suffer",
+				"The slings and arrows of outrageous fortune",
+				"Or to take arms against a sea of troubles,",
+				"And by opposing end them?--To die,--to sleep,--",
+				"No more; and by a sleep to say we end",
+				"The heartache, and the thousand natural shocks",
+				"That flesh is heir to,--'tis a consummation",
+				"Devoutly to be wish'd. To die,--to sleep;--",
+				"To sleep! perchance to dream:--ay, there's the rub;",
+				"For in that sleep of death what dreams may come,",
+				"When we have shuffled off this mortal coil,",
+				"Must give us pause: there's the respect",
+				"That makes calamity of so long life;",
+				"For who would bear the whips and scorns of time,",
+				"The oppressor's wrong, the proud man's contumely,",
+				"The pangs of despis'd love, the law's delay,",
+				"The insolence of office, and the spurns",
+				"That patient merit of the unworthy takes,",
+				"When he himself might his quietus make",
+				"With a bare bodkin? who would these fardels bear,",
+				"To grunt and sweat under a weary life,",
+				"But that the dread of something after death,--",
+				"The undiscover'd country, from whose bourn",
+				"No traveller returns,--puzzles the will,",
+				"And makes us rather bear those ills we have",
+				"Than fly to others that we know not of?",
+				"Thus conscience does make cowards of us all;",
+				"And thus the native hue of resolution",
+				"Is sicklied o'er with the pale cast of thought;",
+				"And enterprises of great pith and moment,",
+				"With this regard, their currents turn awry,",
+				"And lose the name of action.--Soft you now!",
+				"The fair Ophelia!--Nymph, in thy orisons",
+				"Be all my sins remember'd."
+				);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
new file mode 100644
index 0000000..fa85f8c
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.type.lambdas;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import junit.framework.Assert;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class LambdaExtractionTest {
+
+	@Test
+	public void testIdentifyLambdas() {
+		try {
+			MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
+				@Override
+				public Integer map(String value) { return Integer.parseInt(value); }
+			};
+			
+			MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
+				@Override
+				public Integer map(String value) { return Integer.parseInt(value); }
+			};
+			
+			MapFunction<?, ?> fromProperClass = new StaticMapper();
+			
+			MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
+				@Override
+				public Tuple2<Integer, Long> map(Integer value) {
+					return new Tuple2<Integer, Long>(value, 1L);
+				}
+			};
+			
+			MapFunction<String, Integer> lambda = (str) -> Integer.parseInt(str);
+			
+			assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface));
+			assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass));
+			assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass));
+			assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromDerived));
+			assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(lambda));
+			assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(STATIC_LAMBDA));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	public static class StaticMapper implements MapFunction<String, Integer> {
+
+		@Override
+		public Integer map(String value) { return Integer.parseInt(value); }
+	}
+	
+	public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+
+		@Override
+		public Tuple2<T, Long> map(T value) throws Exception;
+	}
+	
+	private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) -> Integer.parseInt(str);
+	
+	public static class MyClass {
+		private String s = "mystring";
+		
+		public MapFunction<Integer, String> getMapFunction() {
+			return (i) -> s;
+		}
+	}
+	
+	@Test
+	public void testLambdaWithMemberVariable() {		
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), TypeInfoParser.parse("Integer"));
+		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testLambdaWithLocalVariable() {
+		String s = "mystring";
+		final int k = 24;
+		int j = 26;
+		
+		MapFunction<Integer, String> f = (i) -> s + k + j;
+		
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Integer"));
+		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testMapLambda() {
+		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+		
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testFlatMapLambda() {
+		FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testMapPartitionLambda() {
+		MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testGroupReduceLambda() {
+		GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testFlatJoinLambda() {
+		FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testJoinLambda() {
+		JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
+		
+		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testCoGroupLambda() {
+		CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testCrossLambda() {
+		CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
+		
+		TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testKeySelectorLambda() {
+		KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+		
+		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testLambdaTypeErasureException() {
+		MapFunction<Tuple1, Tuple1> f = (i) -> null;
+		
+		try {
+			TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1<String>"));
+			Assert.fail();
+		}
+		catch (InvalidTypesException e) {
+			// ok
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
new file mode 100644
index 0000000..1420483
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class AllGroupReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "aaabacad\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+		DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
+			String conc = "";
+			for (String s : values) {
+				conc = conc.concat(s);
+			}
+			out.collect(conc);
+		});
+		concatDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
new file mode 100644
index 0000000..667a786
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class CoGroupITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "6\n3\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
+				.with((values1, values2, out) -> {
+					int sum = 0;
+					for (Tuple2<Integer, String> next : values1) {
+						sum += next.f0;
+					}
+					for (Tuple2<Integer, String> next : values2) {
+						sum += next.f0;
+					}
+					out.collect(sum);
+				});
+		joined.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
new file mode 100644
index 0000000..60916c9
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class CrossITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "2,hello not\n" +
+			"3,what's not\n" +
+			"3,up not\n" +
+			"2,hello much\n" +
+			"3,what's much\n" +
+			"3,up much\n" +
+			"3,hello really\n" +
+			"4,what's really\n" +
+			"4,up really";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer,String>> joined = left.cross(right)
+				.with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
new file mode 100644
index 0000000..d83db06
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FilterITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n";
+
+	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
+		data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
+		data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
+		data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
+		data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
+		data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
+		data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
+		data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
+		data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
+		data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
+		data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
+		data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
+		data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
+		data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
+		data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
+		data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
+		data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
+		data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
+		data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
+		data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
+		data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
+		data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(value -> value.f2.contains("world"));
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
new file mode 100644
index 0000000..714c14c
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FlatJoinITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "2,what's really\n" +
+			"2,up really\n" +
+			"1,hello not\n" +
+			"1,hello much\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+				.with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
new file mode 100644
index 0000000..2b0e344
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FlatMapITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "bb\n" +
+			"bb\n" +
+			"bc\n" +
+			"bd\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+		DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
+		flatMappedDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
new file mode 100644
index 0000000..23300c8
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class GroupReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "abad\n" +
+			"aaac\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer,String>> stringDs = env.fromElements(
+				new Tuple2<Integer,String>(1, "aa"),
+				new Tuple2<Integer,String>(2, "ab"),
+				new Tuple2<Integer,String>(1, "ac"),
+				new Tuple2<Integer,String>(2, "ad")
+				);
+		DataSet<String> concatDs = stringDs
+				.groupBy(0)
+				.reduceGroup((values, out) -> {
+					String conc = "";
+					for (Tuple2<Integer,String> next : values) {
+						conc = conc.concat(next.f1);
+					}
+					out.collect(conc);
+				});
+		concatDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
new file mode 100644
index 0000000..aef35ac
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class JoinITCase extends JavaProgramTestBase {
+	
+	private static final String EXPECTED_RESULT = "2,what's really\n" +
+			"2,up really\n" +
+			"1,hello not\n" +
+			"1,hello much\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+				.with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
new file mode 100644
index 0000000..d4cf585
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class MapITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "bb\n" +
+			"bb\n" +
+			"bc\n" +
+			"bd\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+		DataSet<String> mappedDs = stringDs.map (s -> s.replace("a", "b"));
+		mappedDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
new file mode 100644
index 0000000..52c215f
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class ReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
+			"2,3,2,Hallo Welt wie,1\n" +
+			"2,2,1,Hallo Welt,2\n" +
+			"3,9,0,P-),2\n" +
+			"3,6,5,BCD,3\n" +
+			"4,17,0,P-),1\n" +
+			"4,17,0,P-),2\n" +
+			"5,11,10,GHI,1\n" +
+			"5,29,0,P-),2\n" +
+			"5,25,0,P-),3\n";
+	
+	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(1,1l,0,"Hallo",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,5l,4,"ABC",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,6l,5,"BCD",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,7l,6,"CDE",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,8l,7,"DEF",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,9l,8,"EFG",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,10l,9,"FGH",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,11l,10,"GHI",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,12l,11,"HIJ",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,13l,12,"IJK",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,14l,13,"JKL",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,15l,14,"KLM",2l));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
+				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+	
+	private String resultPath;
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+				.groupBy(4, 0)
+				.reduce((in1, in2) -> {
+					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+					return out;
+				});
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/README.md
----------------------------------------------------------------------
diff --git a/flink-quickstart/README.md b/flink-quickstart/README.md
index 9ba0b16..e81cd57 100644
--- a/flink-quickstart/README.md
+++ b/flink-quickstart/README.md
@@ -25,3 +25,7 @@ The `quickstart.sh` script always points to the current stable release (v0.4, v0
 
 
 (Use `-DarchetypeCatalog=local` for local testing during archetype development)
+
+# Java 8 with Lambda Expressions
+
+If you are planning to use Java 8 and want to use Lambda Expression, please open the generated "pom.xml" file and modify/uncomment the mentioned lines.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index bd81df5..3ca5c35 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -47,7 +47,7 @@ under the License.
 	</repositories>
 	
 	<!-- These two requirements are the minimum to use and develop Flink. 
-		You can add others like <artifactId>pact-scala-core</artifactId> for Scala! -->
+		You can add others like <artifactId>flink-scala</artifactId> for Scala! -->
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -95,10 +95,69 @@ under the License.
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>3.1</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.6</source> <!-- If you want to use Java 8, change this to "1.8" -->
+					<target>1.6</target> <!-- If you want to use Java 8, change this to "1.8" -->
 				</configuration>
 			</plugin>
 		</plugins>
+		
+		<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
+		<!-- <pluginManagement>
+			<plugins>
+				<plugin>
+					<artifactId>maven-compiler-plugin</artifactId>
+					<configuration>
+						<source>1.8</source>
+						<target>1.8</target>
+						<compilerId>jdt</compilerId>
+					</configuration>
+					<dependencies>
+						<dependency>
+							<groupId>org.eclipse.tycho</groupId>
+							<artifactId>tycho-compiler-jdt</artifactId>
+							<version>0.21.0</version>
+						</dependency>
+					</dependencies>
+				</plugin>
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-assembly-plugin</artifactId>
+										<versionRange>[2.4,)</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-compiler-plugin</artifactId>
+										<versionRange>[3.1,)</versionRange>
+										<goals>
+											<goal>testCompile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+		-->
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
index 4e424da..840b948 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
@@ -53,7 +53,7 @@ public class Job {
 		 * 	.filter()
 		 * 	.flatMap()
 		 * 	.join()
-		 * 	.group()
+		 * 	.coGroup()
 		 * and many more.
 		 * Have a look at the programming guide for the Java API:
 		 *

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f18b4c5..3874849 100644
--- a/pom.xml
+++ b/pom.xml
@@ -354,7 +354,7 @@ under the License.
 					<jdk>1.8</jdk>
 				</activation>
 				<modules>
-					<module>flink-java8-tests</module>
+					<module>flink-java8</module>
 				</modules>
 				<build>
 					<plugins>