You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/20 06:07:22 UTC

[3/3] flink git commit: [FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction

[FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction

This commit removes the flink-java8 module and merges some tests into flink-core/flink-runtime. It ensures to have the possibility for passing explicit type information in DataStream API as a fallback. Since the tycho compiler approach was very hacky and seems not to work anymore, this commit also removes all references in the docs and quickstarts.

This closes #6120.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddba1b69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddba1b69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddba1b69

Branch: refs/heads/master
Commit: ddba1b69f43cbb885e178dfaafa120f1fe196a13
Parents: 95eadfe
Author: Timo Walther <tw...@apache.org>
Authored: Mon Jun 4 12:49:43 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Fri Jul 20 08:06:07 2018 +0200

----------------------------------------------------------------------
 docs/dev/java8.md                               | 198 ----------
 docs/dev/java_lambdas.md                        | 138 +++++++
 .../api/java/typeutils/TypeExtractionUtils.java |  23 ++
 .../flink/api/java/typeutils/TypeExtractor.java | 290 ++++++--------
 .../java/typeutils/LambdaExtractionTest.java    | 340 ++++++++++++++++
 .../examples/java/relational/TPCHQuery10.java   |  28 +-
 .../examples/java/wordcount/WordCount.java      |   3 +-
 .../streaming/examples/wordcount/WordCount.java |  14 +-
 flink-java8/pom.xml                             | 225 -----------
 .../examples/java8/relational/TPCHQuery10.java  | 212 ----------
 .../examples/java8/wordcount/WordCount.java     | 124 ------
 .../examples/java8/wordcount/WordCount.java     | 124 ------
 .../java/type/lambdas/LambdaExtractionTest.java | 383 -------------------
 .../org/apache/flink/cep/CEPLambdaTest.java     | 104 -----
 .../runtime/util/JarFileCreatorLambdaTest.java  | 113 ------
 .../util/jartestprogram/FilterLambda1.java      |  41 --
 .../util/jartestprogram/FilterLambda2.java      |  39 --
 .../util/jartestprogram/FilterLambda3.java      |  39 --
 .../util/jartestprogram/FilterLambda4.java      |  38 --
 .../util/jartestprogram/UtilFunction.java       |  30 --
 .../jartestprogram/UtilFunctionWrapper.java     |  35 --
 .../runtime/util/jartestprogram/WordFilter.java |  28 --
 .../operators/lambdas/AllGroupReduceITCase.java |  59 ---
 .../java/operators/lambdas/CoGroupITCase.java   |  74 ----
 .../api/java/operators/lambdas/CrossITCase.java |  73 ----
 .../java/operators/lambdas/FilterITCase.java    |  91 -----
 .../java/operators/lambdas/FlatJoinITCase.java  |  68 ----
 .../java/operators/lambdas/FlatMapITCase.java   |  56 ---
 .../operators/lambdas/GroupReduceITCase.java    |  69 ----
 .../api/java/operators/lambdas/JoinITCase.java  |  69 ----
 .../api/java/operators/lambdas/MapITCase.java   |  74 ----
 .../java/operators/lambdas/ReduceITCase.java    | 109 ------
 .../src/test/resources/log4j-test.properties    |  19 -
 .../org/apache/flink/cep/PatternStream.java     |  16 +-
 .../java/org/apache/flink/cep/CEPITCase.java    |  35 +-
 .../flink/graph/asm/translate/Translate.java    |   4 -
 .../main/resources/archetype-resources/pom.xml  |  19 -
 .../flink/runtime/util/JarFileCreatorTest.java  |  91 ++++-
 .../jartestprogram/FilterWithIndirection.java   |  38 ++
 .../util/jartestprogram/FilterWithLambda.java   |  40 ++
 .../FilterWithMethodReference.java              |  41 ++
 .../util/jartestprogram/UtilFunction.java       |  32 ++
 .../jartestprogram/UtilFunctionWrapper.java     |  37 ++
 .../runtime/util/jartestprogram/WordFilter.java |  29 ++
 .../api/datastream/AllWindowedStream.java       |   2 -
 .../api/datastream/AsyncDataStream.java         |   1 -
 .../datastream/BroadcastConnectedStream.java    |   4 -
 .../api/datastream/CoGroupedStreams.java        |  40 +-
 .../api/datastream/ConnectedStreams.java        |  28 +-
 .../streaming/api/datastream/DataStream.java    |  16 +-
 .../api/datastream/IterativeStream.java         |   4 +
 .../streaming/api/datastream/JoinedStreams.java |  43 ++-
 .../streaming/api/datastream/KeyedStream.java   |  65 ++--
 .../api/datastream/WindowedStream.java          |   2 -
 .../flink/streaming/api/TypeFillTest.java       |  65 +++-
 .../flink/test/operators/CoGroupITCase.java     |  33 ++
 .../apache/flink/test/operators/MapITCase.java  |  34 ++
 .../flink/test/operators/ReduceITCase.java      |   6 +-
 pom.xml                                         |   1 -
 59 files changed, 1223 insertions(+), 2833 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/docs/dev/java8.md
----------------------------------------------------------------------
diff --git a/docs/dev/java8.md b/docs/dev/java8.md
deleted file mode 100644
index 8e7e643..0000000
--- a/docs/dev/java8.md
+++ /dev/null
@@ -1,198 +0,0 @@
----
-title: "Java 8"
-nav-parent_id: api-concepts
-nav-pos: 20
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Java 8 introduces several new language features designed for faster and clearer coding. With the most important feature,
-the so-called "Lambda Expressions", Java 8 opens the door to functional programming. Lambda Expressions allow for implementing and
-passing functions in a straightforward way without having to declare additional (anonymous) classes.
-
-The newest version of Flink supports the usage of Lambda Expressions for all operators of the Java API.
-This document shows how to use Lambda Expressions and describes current limitations. For a general introduction to the
-Flink API, please refer to the [Programming Guide]({{ site.baseurl }}/dev/api_concepts.html)
-
-* TOC
-{:toc}
-
-### Examples
-
-The following example illustrates how to implement a simple, inline `map()` function that squares its input using a Lambda Expression.
-The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java 8 compiler.
-
-{% highlight java %}
-env.fromElements(1, 2, 3)
-// returns the squared i
-.map(i -> i*i)
-.print();
-{% endhighlight %}
-
-The next two examples show different implementations of a function that uses a `Collector` for output.
-Functions, such as `flatMap()`, require an output type (in this case `String`) to be defined for the `Collector` in order to be type-safe.
-If the `Collector` type can not be inferred from the surrounding context, it needs to be declared in the Lambda Expression's parameter list manually.
-Otherwise the output will be treated as type `Object` which can lead to undesired behaviour.
-
-{% highlight java %}
-DataSet<Integer> input = env.fromElements(1, 2, 3);
-
-// collector type must be declared
-input.flatMap((Integer number, Collector<String> out) -> {
-    StringBuilder builder = new StringBuilder();
-    for(int i = 0; i < number; i++) {
-        builder.append("a");
-        out.collect(builder.toString());
-    }
-})
-// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
-.print();
-{% endhighlight %}
-
-{% highlight java %}
-DataSet<Integer> input = env.fromElements(1, 2, 3);
-
-// collector type must not be declared, it is inferred from the type of the dataset
-DataSet<String> manyALetters = input.flatMap((number, out) -> {
-    StringBuilder builder = new StringBuilder();
-    for(int i = 0; i < number; i++) {
-       builder.append("a");
-       out.collect(builder.toString());
-    }
-});
-
-// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
-manyALetters.print();
-{% endhighlight %}
-
-The following code demonstrates a word count which makes extensive use of Lambda Expressions.
-
-{% highlight java %}
-DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
-
-// filter out strings that contain "not"
-input.filter(line -> !line.contains("not"))
-// split each line by space
-.map(line -> line.split(" "))
-// emit a pair <word,1> for each array element
-.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out)
-    -> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
-    )
-// group and sum up
-.groupBy(0).sum(1)
-// print
-.print();
-{% endhighlight %}
-
-### Compiler Limitations
-Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler contained in Eclipse Luna 4.4.2 (and above)**.
-
-Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely.
-Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all generic parameters related to Lambda Expressions. This means that types such as `Tuple2<String, Integer>` or `Collector<String>` declared as a Lambda function input or output parameter will be pruned to `Tuple2` or `Collector` in the compiled `.class` files, which is too little information for the Flink compiler.
-
-How to compile a Flink job that contains Lambda Expressions with the JDT compiler will be covered in the next section.
-
-However, it is possible to implement functions such as `map()` or `filter()` with Lambda Expressions in Java 8 compilers other than the Eclipse JDT compiler as long as the function has no `Collector`s or `Iterable`s *and* only if the function handles unparameterized types such as `Integer`, `Long`, `String`, `MyOwnClass` (types without Generics!).
-
-#### Compile Flink jobs with the Eclipse JDT compiler and Maven
-
-If you are using the Eclipse IDE, you can run and debug your Flink code within the IDE without any problems after some configuration steps. The Eclipse IDE by default compiles its Java sources with the Eclipse JDT compiler. The next section describes how to configure the Eclipse IDE.
-
-If you are using a different IDE such as IntelliJ IDEA or you want to package your Jar-File with Maven to run your job on a cluster, you need to modify your project's `pom.xml` file and build your program with Maven. The [quickstart]({{site.baseurl}}/quickstart/setup_quickstart.html) contains preconfigured Maven projects which can be used for new projects or as a reference. Uncomment the mentioned lines in your generated quickstart `pom.xml` file if you want to use Java 8 with Lambda Expressions.
-
-Alternatively, you can manually insert the following lines to your Maven `pom.xml` file. Maven will then use the Eclipse JDT compiler for compilation.
-
-{% highlight xml %}
-<!-- put these lines under "project/build/pluginManagement/plugins" of your pom.xml -->
-
-<plugin>
-    <!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
-    <artifactId>maven-compiler-plugin</artifactId>
-    <configuration>
-        <source>1.8</source>
-        <target>1.8</target>
-        <compilerId>jdt</compilerId>
-    </configuration>
-    <dependencies>
-        <!-- This dependency provides the implementation of compiler "jdt": -->
-        <dependency>
-            <groupId>org.eclipse.tycho</groupId>
-            <artifactId>tycho-compiler-jdt</artifactId>
-            <version>0.21.0</version>
-        </dependency>
-    </dependencies>
-</plugin>
-{% endhighlight %}
-
-If you are using Eclipse for development, the m2e plugin might complain about the inserted lines above and marks your `pom.xml` as invalid. If so, insert the following lines to your `pom.xml`.
-
-{% highlight xml %}
-<!-- put these lines under "project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions" of your pom.xml -->
-
-<pluginExecution>
-    <pluginExecutionFilter>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <versionRange>[3.1,)</versionRange>
-        <goals>
-            <goal>testCompile</goal>
-            <goal>compile</goal>
-        </goals>
-    </pluginExecutionFilter>
-    <action>
-        <ignore></ignore>
-    </action>
-</pluginExecution>
-{% endhighlight %}
-
-#### Run and debug Flink jobs within the Eclipse IDE
-
-First of all, make sure you are running a current version of Eclipse IDE (4.4.2 or later). Also make sure that you have a Java 8 Runtime Environment (JRE) installed in Eclipse IDE (`Window` -> `Preferences` -> `Java` -> `Installed JREs`).
-
-Create/Import your Eclipse project.
-
-If you are using Maven, you also need to change the Java version in your `pom.xml` for the `maven-compiler-plugin`. Otherwise right click the `JRE System Library` section of your project and open the `Properties` window in order to switch to a Java 8 JRE (or above) that supports Lambda Expressions.
-
-The Eclipse JDT compiler needs a special compiler flag in order to store type information in `.class` files. Open the JDT configuration file at `{project directory}/.settings/org.eclipse.jdt.core.prefs` with your favorite text editor and add the following line:
-
-{% highlight plain %}
-org.eclipse.jdt.core.compiler.codegen.lambda.genericSignature=generate
-{% endhighlight %}
-
-If not already done, also modify the Java versions of the following properties to `1.8` (or above):
-
-{% highlight plain %}
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
-org.eclipse.jdt.core.compiler.compliance=1.8
-org.eclipse.jdt.core.compiler.source=1.8
-{% endhighlight %}
-
-After you have saved the file, perform a complete project refresh in Eclipse IDE.
-
-If you are using Maven, right click your Eclipse project and select `Maven` -> `Update Project...`.
-
-You have configured everything correctly, if the following Flink program runs without exceptions:
-
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.fromElements(1, 2, 3).map((in) -> new Tuple1<String>(" " + in)).print();
-env.execute();
-{% endhighlight %}
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/docs/dev/java_lambdas.md
----------------------------------------------------------------------
diff --git a/docs/dev/java_lambdas.md b/docs/dev/java_lambdas.md
new file mode 100644
index 0000000..4b306ac
--- /dev/null
+++ b/docs/dev/java_lambdas.md
@@ -0,0 +1,138 @@
+---
+title: "Java Lambda Expressions"
+nav-parent_id: api-concepts
+nav-pos: 20
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Java 8 introduced several new language features designed for faster and clearer coding. With the most important feature,
+the so-called "Lambda Expressions", it opened the door to functional programming. Lambda expressions allow for implementing and
+passing functions in a straightforward way without having to declare additional (anonymous) classes.
+
+<span class="label label-danger">Attention</span> Flink supports the usage of lambda expressions for all operators of the Java API, however, whenever a lambda expression uses Java generics you need to declare type information *explicitly*. 
+
+This document shows how to use lambda expressions and describes current limitations. For a general introduction to the
+Flink API, please refer to the [Programming Guide]({{ site.baseurl }}/dev/api_concepts.html)
+
+### Examples and Limitations
+
+The following example illustrates how to implement a simple, inline `map()` function that squares its input using a lambda expression.
+The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java compiler.
+
+{% highlight java %}
+env.fromElements(1, 2, 3)
+// returns the squared i
+.map(i -> i*i)
+.print();
+{% endhighlight %}
+
+Flink can automatically extract the result type information from the implementation of the method signature `OUT map(IN value)` because `OUT` is not generic but `Integer`.
+
+Unfortunately, functions such as `flatMap()` with a signature `void flatMap(IN value, Collector<OUT> out)` are compiled into `void flatMap(IN value, Collector out)` by the Java compiler. This makes it impossible for Flink to infer the type information for the output type automatically.
+
+Flink will most likely throw an exception similar to the following:
+
+{% highlight plain%}
+org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
+    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
+    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
+    Otherwise the type has to be specified explicitly using type information.
+{% endhighlight %}
+
+In this case, the type information needs to be *specified explicitly*, otherwise the output will be treated as type `Object` which leads to unefficient serialization.
+
+{% highlight java %}
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.util.Collector;
+
+DataSet<Integer> input = env.fromElements(1, 2, 3);
+
+// collector type must be declared
+input.flatMap((Integer number, Collector<String> out) -> {
+    StringBuilder builder = new StringBuilder();
+    for(int i = 0; i < number; i++) {
+        builder.append("a");
+        out.collect(builder.toString());
+    }
+})
+// provide type information explicitly
+.returns(Types.STRING)
+// prints "a", "a", "aa", "a", "aa", "aaa"
+.print();
+{% endhighlight %}
+
+Similar problems occur when using a `map()` function with a generic return type. A method signature `Tuple2<Integer, Integer> map(Integer value)` is erasured to `Tuple2 map(Integer value)` in the example below.
+
+{% highlight java %}
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+env.fromElements(1, 2, 3)
+    .map(i -> Tuple2.of(i, i))    // no information about fields of Tuple2
+    .print();
+{% endhighlight %}
+
+In general, those problems can be solved in multiple ways:
+
+{% highlight java %}
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+// use the explicit ".returns(...)"
+env.fromElements(1, 2, 3)
+    .map(i -> Tuple2.of(i, i))
+    .returns(Types.TUPLE(Types.INT, Types.INT))
+    .print();
+
+// use a class instead
+env.fromElements(1, 2, 3)
+    .map(new MyTuple2Mapper())
+    .print();
+
+public static class MyTuple2Mapper extends MapFunction<Integer, Integer> {
+    @Override
+    public Tuple2<Integer, Integer> map(Integer i) {
+        return Tuple2.of(i, i);
+    }
+}
+
+// use an anonymous class instead
+env.fromElements(1, 2, 3)
+    .map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
+        @Override
+        public Tuple2<Integer, Integer> map(Integer i) {
+            return Tuple2.of(i, i);
+        }
+    })
+    .print();
+
+// or in this example use a tuple subclass instead
+env.fromElements(1, 2, 3)
+    .map(i -> new DoubleTuple(i, i))
+    .print();
+
+public static class DoubleTuple extends Tuple2<Integer, Integer> {
+    public DoubleTuple(int f0, int f1) {
+        this.f0 = f0;
+        this.f1 = f1;
+    }
+}
+{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index f005ed9..07f1e1e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -158,6 +158,7 @@ public class TypeExtractionUtils {
 	/**
 	 * Extracts type from given index from lambda. It supports nested types.
 	 *
+	 * @param baseClass SAM function that the lambda implements
 	 * @param exec lambda function to extract the type from
 	 * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy
 	 * @param paramLen count of total parameters of the lambda (including closure parameters)
@@ -165,14 +166,17 @@ public class TypeExtractionUtils {
 	 * @return extracted type
 	 */
 	public static Type extractTypeFromLambda(
+		Class<?> baseClass,
 		LambdaExecutable exec,
 		int[] lambdaTypeArgumentIndices,
 		int paramLen,
 		int baseParametersLen) {
 		Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]];
 		for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) {
+			validateLambdaType(baseClass, output);
 			output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]);
 		}
+		validateLambdaType(baseClass, output);
 		return output;
 	}
 
@@ -328,4 +332,23 @@ public class TypeExtractionUtils {
 		}
 		return Object.class;
 	}
+
+	/**
+	 * Checks whether the given type has the generic parameters declared in the class definition.
+	 *
+	 * @param t type to be validated
+	 */
+	public static void validateLambdaType(Class<?> baseClass, Type t) {
+		if (!(t instanceof Class)) {
+			return;
+		}
+		final Class<?> clazz = (Class<?>) t;
+
+		if (clazz.getTypeParameters().length > 0) {
+			throw new InvalidTypesException("The generic type parameters of '" + clazz.getSimpleName() + "' are missing. "
+				+ "In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. "
+				+ "An easy workaround is to use an (anonymous) class instead that implements the '" + baseClass.getName() + "' interface. "
+				+ "Otherwise the type has to be specified explicitly using type information.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index f514384..07b6cfe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -84,6 +84,12 @@ import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClas
 /**
  * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
  * functions.
+ *
+ * <p>NOTES FOR USERS OF THIS CLASS:
+ * Automatic type extraction is a hacky business that depends on a lot of variables such as generics,
+ * compiler, interfaces, etc. The type extraction fails regularly with either {@link MissingTypeInfo} or
+ * hard exceptions. Whenever you use methods of this class, make sure to provide a way to pass custom
+ * type information as a fallback.
  */
 @Public
 public class TypeExtractor {
@@ -171,7 +177,6 @@ public class TypeExtractor {
 			MapFunction.class,
 			0,
 			1,
-			new int[]{0},
 			NO_INDEX,
 			inType,
 			functionName,
@@ -193,7 +198,6 @@ public class TypeExtractor {
 			FlatMapFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1, 0},
 			inType,
 			functionName,
@@ -222,7 +226,6 @@ public class TypeExtractor {
 			FoldFunction.class,
 			0,
 			1,
-			new int[]{1},
 			NO_INDEX,
 			inType,
 			functionName,
@@ -241,7 +244,6 @@ public class TypeExtractor {
 			AggregateFunction.class,
 			0,
 			1,
-			new int[]{0},
 			NO_INDEX,
 			inType,
 			functionName,
@@ -261,7 +263,6 @@ public class TypeExtractor {
 			0,
 			2,
 			NO_INDEX,
-			NO_INDEX,
 			inType,
 			functionName,
 			allowMissing);
@@ -281,7 +282,6 @@ public class TypeExtractor {
 			MapPartitionFunction.class,
 			0,
 			1,
-			new int[]{0, 0},
 			new int[]{1, 0},
 			inType,
 			functionName,
@@ -302,7 +302,6 @@ public class TypeExtractor {
 			GroupReduceFunction.class,
 			0,
 			1,
-			new int[]{0, 0},
 			new int[]{1, 0},
 			inType,
 			functionName,
@@ -323,7 +322,6 @@ public class TypeExtractor {
 			GroupCombineFunction.class,
 			0,
 			1,
-			new int[]{0, 0},
 			new int[]{1, 0},
 			inType,
 			functionName,
@@ -347,8 +345,6 @@ public class TypeExtractor {
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
 			new int[]{2, 0},
 			in1Type,
 			in2Type,
@@ -373,8 +369,6 @@ public class TypeExtractor {
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
 			NO_INDEX,
 			in1Type,
 			in2Type,
@@ -399,8 +393,6 @@ public class TypeExtractor {
 			0,
 			1,
 			2,
-			new int[]{0, 0},
-			new int[]{1, 0},
 			new int[]{2, 0},
 			in1Type,
 			in2Type,
@@ -425,8 +417,6 @@ public class TypeExtractor {
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
 			NO_INDEX,
 			in1Type,
 			in2Type,
@@ -448,7 +438,6 @@ public class TypeExtractor {
 			KeySelector.class,
 			0,
 			1,
-			new int[]{0},
 			NO_INDEX,
 			inType,
 			functionName,
@@ -465,46 +454,16 @@ public class TypeExtractor {
 		Partitioner<T> partitioner,
 		String functionName,
 		boolean allowMissing) {
-		try {
-			final LambdaExecutable exec;
-			try {
-				exec = checkAndExtractLambda(partitioner);
-			} catch (TypeExtractionException e) {
-				throw new InvalidTypesException("Internal error occurred.", e);
-			}
-			if (exec != null) {
-				// check for lambda type erasure
-				validateLambdaGenericParameters(exec);
-
-				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
-				// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
-				final int paramLen = exec.getParameterTypes().length;
 
-				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(Partitioner.class);
-				// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
-				final int baseParametersLen = sam.getParameterTypes().length;
-
-				final Type keyType = TypeExtractionUtils.extractTypeFromLambda(
-					exec,
-					new int[]{0},
-					paramLen,
-					baseParametersLen);
-				return new TypeExtractor().privateCreateTypeInfo(keyType, null, null);
-			} else {
-				return new TypeExtractor().privateCreateTypeInfo(
-					Partitioner.class,
-					partitioner.getClass(),
-					0,
-					null,
-					null);
-			}
-		} catch (InvalidTypesException e) {
-			if (allowMissing) {
-				return (TypeInformation<T>) new MissingTypeInfo(functionName != null ? functionName : partitioner.toString(), e);
-			} else {
-				throw e;
-			}
-		}
+		return getUnaryOperatorReturnType(
+			partitioner,
+			Partitioner.class,
+			-1,
+			0,
+			new int[]{0},
+			null,
+			functionName,
+			allowMissing);
 	}
 
 
@@ -524,24 +483,43 @@ public class TypeExtractor {
 	/**
 	 * Returns the unary operator's return type.
 	 *
-	 * <p><b>NOTE:</b> lambda type indices allow extraction of Type from lambdas. To extract input type <b>IN</b>
-	 * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices.
+	 * <p>This method can extract a type in 4 different ways:
+	 *
+	 * <p>1. By using the generics of the base class like MyFunction<X, Y, Z, IN, OUT>.
+	 *    This is what outputTypeArgumentIndex (in this example "4") is good for.
+	 *
+	 * <p>2. By using input type inference SubMyFunction<T, String, String, String, T>.
+	 *    This is what inputTypeArgumentIndex (in this example "0") and inType is good for.
+	 *
+	 * <p>3. By using the static method that a compiler generates for Java lambdas.
+	 *    This is what lambdaOutputTypeArgumentIndices is good for. Given that MyFunction has
+	 *    the following single abstract method:
 	 *
 	 * <pre>
 	 * <code>
-	 * OUT apply(Map<String, List<IN>> value)
+	 * void apply(IN value, Collector<OUT> value)
 	 * </code>
 	 * </pre>
 	 *
+	 * <p> Lambda type indices allow the extraction of a type from lambdas. To extract the
+	 *     output type <b>OUT</b> from the function one should pass {@code new int[] {1, 0}}.
+	 *     "1" for selecting the parameter and 0 for the first generic in this type.
+	 *     Use {@code TypeExtractor.NO_INDEX} for selecting the return type of the lambda for
+	 *     extraction or if the class cannot be a lambda because it is not a single abstract
+	 *     method interface.
+	 *
+	 * <p>4. By using interfaces such as {@link TypeInfoFactory} or {@link ResultTypeQueryable}.
+	 *
+	 * <p>See also comments in the header of this class.
+	 *
 	 * @param function Function to extract the return type from
 	 * @param baseClass Base class of the function
-	 * @param inputTypeArgumentIndex Index of input type in the class specification
-	 * @param outputTypeArgumentIndex Index of output type in the class specification
-	 * @param lambdaInputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
+	 * @param inputTypeArgumentIndex Index of input generic type in the base class specification (ignored if inType is null)
+	 * @param outputTypeArgumentIndex Index of output generic type in the base class specification
 	 * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
-	 * @param inType Type of the input elements (In case of an iterable, it is the element type)
+	 * @param inType Type of the input elements (In case of an iterable, it is the element type) or null
 	 * @param functionName Function name
-	 * @param allowMissing Can the type information be missing
+	 * @param allowMissing Can the type information be missing (this generates a MissingTypeInfo for postponing an exception)
 	 * @param <IN> Input type
 	 * @param <OUT> Output type
 	 * @return TypeInformation of the return type of the function
@@ -553,11 +531,23 @@ public class TypeExtractor {
 		Class<?> baseClass,
 		int inputTypeArgumentIndex,
 		int outputTypeArgumentIndex,
-		int[] lambdaInputTypeArgumentIndices,
 		int[] lambdaOutputTypeArgumentIndices,
 		TypeInformation<IN> inType,
 		String functionName,
 		boolean allowMissing) {
+
+		Preconditions.checkArgument(inType == null || inputTypeArgumentIndex >= 0, "Input type argument index was not provided");
+		Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
+		Preconditions.checkArgument(
+			lambdaOutputTypeArgumentIndices != null,
+			"Indices for output type arguments within lambda not provided");
+
+		// explicit result type has highest precedence
+		if (function instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable<OUT>) function).getProducedType();
+		}
+
+		// perform extraction
 		try {
 			final LambdaExecutable exec;
 			try {
@@ -566,14 +556,6 @@ public class TypeExtractor {
 				throw new InvalidTypesException("Internal error occurred.", e);
 			}
 			if (exec != null) {
-				Preconditions.checkArgument(
-					lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
-					"Indices for input type arguments within lambda not provided");
-				Preconditions.checkArgument(
-					lambdaOutputTypeArgumentIndices != null,
-					"Indices for output type arguments within lambda not provided");
-				// check for lambda type erasure
-				validateLambdaGenericParameters(exec);
 
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
 				// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
@@ -584,43 +566,23 @@ public class TypeExtractor {
 				// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
 				final int baseParametersLen = sam.getParameterTypes().length;
 
-				// executable references "this" implicitly
-				if (paramLen <= 0) {
-					// executable declaring class can also be a super class of the input type
-					// we only validate if the executable exists in input type
-					validateInputContainsExecutable(exec, inType);
-				}
-				else {
-					final Type input = TypeExtractionUtils.extractTypeFromLambda(
-						exec,
-						lambdaInputTypeArgumentIndices,
-						paramLen,
-						baseParametersLen);
-					validateInputType(input, inType);
-				}
-
-				if (function instanceof ResultTypeQueryable) {
-					return ((ResultTypeQueryable<OUT>) function).getProducedType();
-				}
-
 				final Type output;
 				if (lambdaOutputTypeArgumentIndices.length > 0) {
 					output = TypeExtractionUtils.extractTypeFromLambda(
+						baseClass,
 						exec,
 						lambdaOutputTypeArgumentIndices,
 						paramLen,
 						baseParametersLen);
 				} else {
 					output = exec.getReturnType();
+					TypeExtractionUtils.validateLambdaType(baseClass, output);
 				}
 
 				return new TypeExtractor().privateCreateTypeInfo(output, inType, null);
 			} else {
-				Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument index was not provided");
-				Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
-				validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);
-				if(function instanceof ResultTypeQueryable) {
-					return ((ResultTypeQueryable<OUT>) function).getProducedType();
+				if (inType != null) {
+					validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);
 				}
 				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, inType, null);
 			}
@@ -637,27 +599,45 @@ public class TypeExtractor {
 	/**
 	 * Returns the binary operator's return type.
 	 *
-	 * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN1</b>
-	 * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInput1TypeArgumentIndices.
+	 * <p>This method can extract a type in 4 different ways:
+	 *
+	 * <p>1. By using the generics of the base class like MyFunction<X, Y, Z, IN, OUT>.
+	 *    This is what outputTypeArgumentIndex (in this example "4") is good for.
+	 *
+	 * <p>2. By using input type inference SubMyFunction<T, String, String, String, T>.
+	 *    This is what inputTypeArgumentIndex (in this example "0") and inType is good for.
+	 *
+	 * <p>3. By using the static method that a compiler generates for Java lambdas.
+	 *    This is what lambdaOutputTypeArgumentIndices is good for. Given that MyFunction has
+	 *    the following single abstract method:
 	 *
 	 * <pre>
 	 * <code>
-	 * OUT apply(Map<String, List<IN1>> value1, List<IN2> value2)
+	 * void apply(IN value, Collector<OUT> value)
 	 * </code>
 	 * </pre>
 	 *
+	 * <p> Lambda type indices allow the extraction of a type from lambdas. To extract the
+	 *     output type <b>OUT</b> from the function one should pass {@code new int[] {1, 0}}.
+	 *     "1" for selecting the parameter and 0 for the first generic in this type.
+	 *     Use {@code TypeExtractor.NO_INDEX} for selecting the return type of the lambda for
+	 *     extraction or if the class cannot be a lambda because it is not a single abstract
+	 *     method interface.
+	 *
+	 * <p>4. By using interfaces such as {@link TypeInfoFactory} or {@link ResultTypeQueryable}.
+	 *
+	 * <p>See also comments in the header of this class.
+	 *
 	 * @param function Function to extract the return type from
 	 * @param baseClass Base class of the function
-	 * @param input1TypeArgumentIndex Index of first input type in the class specification
-	 * @param input2TypeArgumentIndex Index of second input type in the class specification
-	 * @param outputTypeArgumentIndex Index of output type in the class specification
-	 * @param lambdaInput1TypeArgumentIndices Table of indices of the type argument specifying the first input type. See example.
-	 * @param lambdaInput2TypeArgumentIndices Table of indices of the type argument specifying the second input type. See example.
+	 * @param input1TypeArgumentIndex Index of first input generic type in the class specification (ignored if in1Type is null)
+	 * @param input2TypeArgumentIndex Index of second input generic type in the class specification (ignored if in2Type is null)
+	 * @param outputTypeArgumentIndex Index of output generic type in the class specification
 	 * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the output type. See example.
 	 * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
 	 * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
 	 * @param functionName Function name
-	 * @param allowMissing Can the type information be missing
+	 * @param allowMissing Can the type information be missing (this generates a MissingTypeInfo for postponing an exception)
 	 * @param <IN1> Left side input type
 	 * @param <IN2> Right side input type
 	 * @param <OUT> Output type
@@ -671,13 +651,25 @@ public class TypeExtractor {
 		int input1TypeArgumentIndex,
 		int input2TypeArgumentIndex,
 		int outputTypeArgumentIndex,
-		int[] lambdaInput1TypeArgumentIndices,
-		int[] lambdaInput2TypeArgumentIndices,
 		int[] lambdaOutputTypeArgumentIndices,
 		TypeInformation<IN1> in1Type,
 		TypeInformation<IN2> in2Type,
 		String functionName,
 		boolean allowMissing) {
+
+		Preconditions.checkArgument(in1Type == null || input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided");
+		Preconditions.checkArgument(in2Type == null || input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided");
+		Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
+		Preconditions.checkArgument(
+			lambdaOutputTypeArgumentIndices != null,
+			"Indices for output type arguments within lambda not provided");
+
+		// explicit result type has highest precedence
+		if (function instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable<OUT>) function).getProducedType();
+		}
+
+		// perform extraction
 		try {
 			final LambdaExecutable exec;
 			try {
@@ -686,17 +678,6 @@ public class TypeExtractor {
 				throw new InvalidTypesException("Internal error occurred.", e);
 			}
 			if (exec != null) {
-				Preconditions.checkArgument(
-					lambdaInput1TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1,
-					"Indices for first input type arguments within lambda not provided");
-				Preconditions.checkArgument(
-					lambdaInput2TypeArgumentIndices != null && lambdaInput2TypeArgumentIndices.length >= 1,
-					"Indices for second input type arguments within lambda not provided");
-				Preconditions.checkArgument(
-					lambdaOutputTypeArgumentIndices != null,
-					"Indices for output type arguments within lambda not provided");
-				// check for lambda type erasure
-				validateLambdaGenericParameters(exec);
 
 				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
 				final int baseParametersLen = sam.getParameterTypes().length;
@@ -704,32 +685,17 @@ public class TypeExtractor {
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
 				final int paramLen = exec.getParameterTypes().length;
 
-				final Type input1 = TypeExtractionUtils.extractTypeFromLambda(
-					exec,
-					lambdaInput1TypeArgumentIndices,
-					paramLen,
-					baseParametersLen);
-				final Type input2 = TypeExtractionUtils.extractTypeFromLambda(
-					exec,
-					lambdaInput2TypeArgumentIndices,
-					paramLen,
-					baseParametersLen);
-
-				validateInputType(input1, in1Type);
-				validateInputType(input2, in2Type);
-				if(function instanceof ResultTypeQueryable) {
-					return ((ResultTypeQueryable<OUT>) function).getProducedType();
-				}
-
 				final Type output;
 				if (lambdaOutputTypeArgumentIndices.length > 0) {
 					output = TypeExtractionUtils.extractTypeFromLambda(
+						baseClass,
 						exec,
 						lambdaOutputTypeArgumentIndices,
 						paramLen,
 						baseParametersLen);
 				} else {
 					output = exec.getReturnType();
+					TypeExtractionUtils.validateLambdaType(baseClass, output);
 				}
 
 				return new TypeExtractor().privateCreateTypeInfo(
@@ -738,13 +704,11 @@ public class TypeExtractor {
 					in2Type);
 			}
 			else {
-				Preconditions.checkArgument(input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided");
-				Preconditions.checkArgument(input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided");
-				Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
-				validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type);
-				validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type);
-				if(function instanceof ResultTypeQueryable) {
-					return ((ResultTypeQueryable<OUT>) function).getProducedType();
+				if (in1Type != null) {
+					validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type);
+				}
+				if (in2Type != null) {
+					validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type);
 				}
 				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, in1Type, in2Type);
 			}
@@ -915,9 +879,10 @@ public class TypeExtractor {
 					return typeInfo;
 				} else {
 					throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) t).getName() + "' in '"
-							+ ((TypeVariable<?>) t).getGenericDeclaration() + "' could not be determined. This is most likely a type erasure problem. "
-							+ "The type extraction currently supports types with generic variables only in cases where "
-							+ "all variables in the return type can be deduced from the input type(s).");
+						+ ((TypeVariable<?>) t).getGenericDeclaration() + "' could not be determined. This is most likely a type erasure problem. "
+						+ "The type extraction currently supports types with generic variables only in cases where "
+						+ "all variables in the return type can be deduced from the input type(s). "
+						+ "Otherwise the type has to be specified explicitly using type information.");
 				}
 			}
 		}
@@ -1165,10 +1130,11 @@ public class TypeExtractor {
 				// variable could not be determined
 				if (subTypesInfo[i] == null && !lenient) {
 					throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
-							+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
-							+ "' could not be determined. This is most likely a type erasure problem. "
-							+ "The type extraction currently supports types with generic variables only in cases where "
-							+ "all variables in the return type can be deduced from the input type(s).");
+						+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
+						+ "' could not be determined. This is most likely a type erasure problem. "
+						+ "The type extraction currently supports types with generic variables only in cases where "
+						+ "all variables in the return type can be deduced from the input type(s). "
+						+ "Otherwise the type has to be specified explicitly using type information.");
 				}
 			} else {
 				// create the type information of the subtype or null/exception
@@ -1618,30 +1584,6 @@ public class TypeExtractor {
 		return fieldCount;
 	}
 
-	private static void validateLambdaGenericParameters(LambdaExecutable exec) {
-		// check the arguments
-		for (Type t : exec.getParameterTypes()) {
-			validateLambdaGenericParameter(t);
-		}
-
-		// check the return type
-		validateLambdaGenericParameter(exec.getReturnType());
-	}
-
-	private static void validateLambdaGenericParameter(Type t) {
-		if(!(t instanceof Class)) {
-			return;
-		}
-		final Class<?> clazz = (Class<?>) t;
-
-		if(clazz.getTypeParameters().length > 0) {
-			throw new InvalidTypesException("The generic type parameters of '" + clazz.getSimpleName() + "' are missing. \n"
-					+ "It seems that your compiler has not stored them into the .class file. \n"
-					+ "Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. \n"
-					+ "See the documentation for more information about how to compile jobs containing lambda expressions.");
-		}
-	}
-
 	/**
 	 * Tries to find a concrete value (Class, ParameterizedType etc. ) for a TypeVariable by traversing the type hierarchy downwards.
 	 * If a value could not be found it will return the most bottom type variable in the hierarchy.

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java
new file mode 100644
index 0000000..1d5cf22
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the type extractor for lambda functions. Many tests only work if the compiler supports
+ * lambdas properly otherwise a MissingTypeInfo is returned.
+ */
+public class LambdaExtractionTest {
+
+	private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> NESTED_TUPLE_BOOLEAN_TYPE =
+			new TypeHint<Tuple2<Tuple1<Integer>, Boolean>>(){}.getTypeInfo();
+
+	private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> NESTED_TUPLE_DOUBLE_TYPE =
+			new TypeHint<Tuple2<Tuple1<Integer>, Double>>(){}.getTypeInfo();
+
+	@Test
+	@SuppressWarnings({"Convert2Lambda", "Anonymous2MethodRef"})
+	public void testIdentifyLambdas() throws TypeExtractionException {
+		MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
+			@Override
+			public Integer map(String value) {
+				return Integer.parseInt(value);
+			}
+		};
+
+		MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
+			@Override
+			public Integer map(String value) {
+				return Integer.parseInt(value);
+			}
+		};
+
+		MapFunction<?, ?> fromProperClass = new StaticMapper();
+
+		MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
+			@Override
+			public Tuple2<Integer, Long> map(Integer value) {
+				return new Tuple2<>(value, 1L);
+			}
+		};
+
+		MapFunction<String, Integer> staticLambda = Integer::parseInt;
+		MapFunction<Integer, String> instanceLambda = Object::toString;
+		MapFunction<String, Integer> constructorLambda = Integer::new;
+
+		assertNull(checkAndExtractLambda(anonymousFromInterface));
+		assertNull(checkAndExtractLambda(anonymousFromClass));
+		assertNull(checkAndExtractLambda(fromProperClass));
+		assertNull(checkAndExtractLambda(fromDerived));
+		assertNotNull(checkAndExtractLambda(staticLambda));
+		assertNotNull(checkAndExtractLambda(instanceLambda));
+		assertNotNull(checkAndExtractLambda(constructorLambda));
+		assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
+	}
+
+	private static class StaticMapper implements MapFunction<String, Integer> {
+		@Override
+		public Integer map(String value) {
+			return Integer.parseInt(value);
+		}
+	}
+
+	private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+		@Override
+		Tuple2<T, Long> map(T value) throws Exception;
+	}
+
+	private static final MapFunction<String, Integer> STATIC_LAMBDA = Integer::parseInt;
+
+	private static class MyClass {
+		private String s = "mystring";
+
+		public MapFunction<Integer, String> getMapFunction() {
+			return (i) -> s;
+		}
+	}
+
+	@Test
+	public void testLambdaWithMemberVariable() {
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), Types.INT);
+		assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+	}
+
+	@Test
+	public void testLambdaWithLocalVariable() {
+		String s = "mystring";
+		final int k = 24;
+		int j = 26;
+
+		MapFunction<Integer, String> f = (i) -> s + k + j;
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, Types.INT);
+		assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+	}
+
+	@Test
+	public void testLambdaWithNonGenericResultType() {
+		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Boolean> f = (i) -> null;
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		assertTrue(ti instanceof BasicTypeInfo);
+		assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
+	}
+
+	@Test
+	public void testMapLambda() {
+		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testFlatMapLambda() {
+		FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, out) -> out.collect(null);
+
+		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testMapPartitionLambda() {
+		MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+
+		TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testJoinLambda() {
+		JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
+
+		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testCoGroupLambda() {
+		CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
+
+		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testKeySelectorLambda() {
+		KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+
+		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testLambdaTypeErasure() {
+		MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
+		assertTrue(ti instanceof MissingTypeInfo);
+	}
+
+	@Test
+	public void testPartitionerLambda() {
+		Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
+		final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner, null, true);
+
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	private static class MyType {
+		private int key;
+
+		public int getKey() {
+			return key;
+		}
+
+		public void setKey(int key) {
+			this.key = key;
+		}
+
+		protected int getKey2() {
+			return 0;
+		}
+	}
+
+	@Test
+	public void testInstanceMethodRefSameType() {
+		MapFunction<MyType, Integer> f = MyType::getKey;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class));
+		assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+	}
+
+	@Test
+	public void testInstanceMethodRefSuperType() {
+		MapFunction<Integer, String> f = Object::toString;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO);
+		assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
+	}
+
+	private static class MySubtype extends MyType {
+		public boolean test;
+	}
+
+	@Test
+	public void testInstanceMethodRefSuperTypeProtected() {
+		MapFunction<MySubtype, Integer> f = MyType::getKey2;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class));
+		assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+	}
+
+	@Test
+	public void testConstructorMethodRef() {
+		MapFunction<String, Integer> f = Integer::new;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO);
+		assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+	}
+
+	private interface InterfaceWithDefaultMethod {
+		void samMethod();
+
+		default void defaultMethod() {
+
+		}
+	}
+
+	@Test
+	public void testSamMethodExtractionInterfaceWithDefaultMethod() {
+		final Method sam = TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithDefaultMethod.class);
+		assertNotNull(sam);
+		assertEquals("samMethod", sam.getName());
+	}
+
+	private interface InterfaceWithMultipleMethods {
+		void firstMethod();
+
+		void secondMethod();
+	}
+
+	@Test(expected = InvalidTypesException.class)
+	public void getSingleAbstractMethodMultipleMethods() {
+		TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithMultipleMethods.class);
+	}
+
+	private interface InterfaceWithoutAbstractMethod {
+		default void defaultMethod() {
+
+		}
+	}
+
+	@Test(expected = InvalidTypesException.class)
+	public void testSingleAbstractMethodNoAbstractMethods() {
+		TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithoutAbstractMethod.class);
+	}
+
+	private abstract class AbstractClassWithSingleAbstractMethod {
+		public abstract void defaultMethod();
+	}
+
+	@Test(expected = InvalidTypesException.class)
+	public void testSingleAbstractMethodNotAnInterface() {
+		TypeExtractionUtils.getSingleAbstractMethod(AbstractClassWithSingleAbstractMethod.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index c585e82..0874999 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.examples.java.relational;
 
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -118,33 +117,18 @@ public class TPCHQuery10 {
 		// orders filtered by year: (orderkey, custkey)
 		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
 				// filter by year
-				orders.filter(
-								new FilterFunction<Tuple3<Integer, Integer, String>>() {
-									@Override
-									public boolean filter(Tuple3<Integer, Integer, String> o) {
-										return Integer.parseInt(o.f2.substring(0, 4)) > 1990;
-									}
-								})
+				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
 				// project fields out that are no longer required
 				.project(0, 1);
 
 		// lineitems filtered by flag: (orderkey, revenue)
 		DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag =
 				// filter by flag
-				lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
-										@Override
-										public boolean filter(Tuple4<Integer, Double, Double, String> l) {
-											return l.f3.equals("R");
-										}
-								})
+				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
 				// compute revenue and project out return flag
-				.map(new MapFunction<Tuple4<Integer, Double, Double, String>, Tuple2<Integer, Double>>() {
-							@Override
-							public Tuple2<Integer, Double> map(Tuple4<Integer, Double, Double, String> l) {
-								// revenue per item = l_extendedprice * (1 - l_discount)
-								return new Tuple2<Integer, Double>(l.f0, l.f1 * (1 - l.f2));
-							}
-					});
+				// revenue per item = l_extendedprice * (1 - l_discount)
+				.map(lineitem -> new Tuple2<>(lineitem.f0, lineitem.f1 * (1 - lineitem.f2)))
+				.returns(Types.TUPLE(Types.INT, Types.DOUBLE)); // for lambda with generics
 
 		// join orders with lineitems: (custkey, revenue)
 		DataSet<Tuple2<Integer, Double>> revenueByCustomer =

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index c494c6f..2882fc7 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -43,7 +43,6 @@ import org.apache.flink.util.Collector;
  * </ul>
  *
  */
-@SuppressWarnings("serial")
 public class WordCount {
 
 	// *************************************************************************
@@ -110,7 +109,7 @@ public class WordCount {
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
+					out.collect(new Tuple2<>(token, 1));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index 9c7c435..ad34d71 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -72,10 +72,10 @@ public class WordCount {
 		}
 
 		DataStream<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new Tokenizer())
-		// group by the tuple field "0" and sum up tuple field "1"
-				.keyBy(0).sum(1);
+			// split up the lines in pairs (2-tuples) containing: (word,1)
+			text.flatMap(new Tokenizer())
+			// group by the tuple field "0" and sum up tuple field "1"
+			.keyBy(0).sum(1);
 
 		// emit result
 		if (params.has("output")) {
@@ -100,18 +100,16 @@ public class WordCount {
 	 * Integer>}).
 	 */
 	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
 			// normalize and split the line
 			String[] tokens = value.toLowerCase().split("\\W+");
 
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
+					out.collect(new Tuple2<>(token, 1));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
deleted file mode 100644
index 62504f9..0000000
--- a/flink-java8/pom.xml
+++ /dev/null
@@ -1,225 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.7-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-java8_${scala.binary.version}</artifactId>
-	<name>flink-java8</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<!-- core dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<!-- test dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-cep_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<!-- just define the Java version to be used for compiling and plugins -->
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version><!--$NO-MVN-MAN-VER$-->
-				<configuration>
-					<source>1.8</source>
-					<target>1.8</target>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<systemPropertyVariables>
-						<log.level>WARN</log.level>
-					</systemPropertyVariables>
-				</configuration>
-			</plugin>
-
-			<!-- get default data from flink-examples-batch package -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-dependency-plugin</artifactId>
-				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
-				<executions>
-					<execution>
-						<id>unpack</id>
-						<phase>prepare-package</phase>
-						<goals>
-							<goal>unpack</goal>
-						</goals>
-						<configuration>
-							<artifactItems>
-								<artifactItem>
-									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
-									<version>${project.version}</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
-								</artifactItem>
-							</artifactItems>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	
-		<pluginManagement>
-			<plugins>
-				<plugin>
-					<!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
-					<artifactId>maven-compiler-plugin</artifactId>
-					<configuration>
-						<source>1.8</source>
-						<target>1.8</target>
-						<compilerId>jdt</compilerId>
-					</configuration>
-					<dependencies>
-						<!-- This dependency provides the implementation of compiler "jdt": -->
-						<dependency>
-							<groupId>org.eclipse.tycho</groupId>
-							<artifactId>tycho-compiler-jdt</artifactId>
-							<version>0.21.0</version>
-						</dependency>
-					</dependencies>
-				</plugin>
-				<plugin>
-					<!-- Skip the deployment of the Java8 artifact -->
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-deploy-plugin</artifactId>
-					<version>2.4</version><!--$NO-MVN-MAN-VER$-->
-					<configuration>
-						<skip>true</skip>
-					</configuration>
-				</plugin>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-assembly-plugin</artifactId>
-										<versionRange>[2.4,)</versionRange>
-										<goals>
-											<goal>single</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-compiler-plugin</artifactId>
-										<versionRange>[3.1,)</versionRange>
-										<goals>
-											<goal>testCompile</goal>
-											<goal>compile</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-dependency-plugin</artifactId>
-										<versionRange>[2.9,)</versionRange>
-										<goals>
-											<goal>unpack</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
deleted file mode 100644
index c0fce4d..0000000
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java8.relational;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-
-/**
- * This program implements a modified version of the TPC-H query 10.
- * The original query can be found at
- * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- *
- * <p>This program implements the following SQL equivalent:
- *
- * <p><pre>{@code
- * SELECT
- *        c_custkey,
- *        c_name,
- *        c_address,
- *        n_name,
- *        c_acctbal
- *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,
- * FROM
- *        customer,
- *        orders,
- *        lineitem,
- *        nation
- * WHERE
- *        c_custkey = o_custkey
- *        AND l_orderkey = o_orderkey
- *        AND YEAR(o_orderdate) > '1990'
- *        AND l_returnflag = 'R'
- *        AND c_nationkey = n_nationkey
- * GROUP BY
- *        c_custkey,
- *        c_name,
- *        c_acctbal,
- *        n_name,
- *        c_address
- * }</pre>
- *
- * <p>Compared to the original TPC-H query this version does not print
- * c_phone and c_comment, only filters by years greater than 1990 instead of
- * a period of 3 months, and does not sort the result by revenue.
- *
- * <p>Input files are plain text CSV files using the pipe character ('|') as field separator
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- * <p>Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
- *
- * <p>This example shows how to use:
- * <ul>
- * <li> inline-defined functions using Java 8 Lambda Expressions
- * </ul>
- */
-public class TPCHQuery10 {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get customer data set: (custkey, name, address, nationkey, acctbal)
-		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
-
-		// get orders data set: (orderkey, custkey, orderdate)
-		DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
-
-		// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
-		DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
-
-		// get nation data set: (nationkey, name)
-		DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
-
-		// orders filtered by year: (orderkey, custkey)
-		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
-				// filter by year
-				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
-				// project fields out that are no longer required
-				.project(0, 1);
-
-		// lineitems filtered by flag: (orderkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
-				// filter by flag
-				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
-				// project fields out that are no longer required
-				.project(0, 1, 2);
-
-		// join orders with lineitems: (custkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
-				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
-									.where(0).equalTo(0)
-									.projectFirst(1).projectSecond(1, 2);
-
-		// aggregate for revenue: (custkey, revenue)
-		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
-				// calculate the revenue for each item
-				// revenue per item = l_extendedprice * (1 - l_discount)
-				.map(i -> new Tuple2<>(i.f0, i.f1 * (1 - i.f2)))
-				// aggregate the revenues per item to revenue per customer
-				.groupBy(0).sum(1);
-
-		// join customer with nation (custkey, name, address, nationname, acctbal)
-		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
-						.joinWithTiny(nations)
-						.where(3).equalTo(0)
-						.projectFirst(0, 1, 2).projectSecond(1).projectFirst(4);
-
-		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
-		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
-				customerWithNation.join(revenueOfCustomerKey)
-				.where(0).equalTo(0)
-				.projectFirst(0, 1, 2, 3, 4).projectSecond(1);
-
-		// emit result
-		customerWithRevenue.writeAsCsv(outputPath);
-
-		// execute program
-		env.execute("TPCH Query 10 Example");
-
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static String customerPath;
-	private static String ordersPath;
-	private static String lineitemPath;
-	private static String nationPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] programArguments) {
-
-		if (programArguments.length > 0) {
-			if (programArguments.length == 5) {
-				customerPath = programArguments[0];
-				ordersPath = programArguments[1];
-				lineitemPath = programArguments[2];
-				nationPath = programArguments[3];
-				outputPath = programArguments[4];
-			} else {
-				System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
-				return false;
-			}
-		} else {
-			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-								"  Due to legal restrictions, we can not ship generated data.\n" +
-								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
-								"  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
-			return false;
-		}
-		return true;
-	}
-
-	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(customerPath)
-					.fieldDelimiter("|")
-					.includeFields("11110100")
-					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
-	}
-
-	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(ordersPath)
-					.fieldDelimiter("|")
-					.includeFields("110010000")
-					.types(Integer.class, Integer.class, String.class);
-	}
-
-	private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(lineitemPath)
-					.fieldDelimiter("|")
-					.includeFields("1000011010000000")
-					.types(Integer.class, Double.class, Double.class, String.class);
-	}
-
-	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(nationPath)
-					.fieldDelimiter("|")
-					.includeFields("1100")
-					.types(Integer.class, String.class);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
deleted file mode 100644
index 8f36f66..0000000
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java8.wordcount;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.util.Collector;
-
-import java.util.Arrays;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram
- * over text files.
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>write a compact Flink program with Java 8 Lambda Expressions.
- * </ul>
- *
- */
-public class WordCount {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataSet<String> text = getTextDataSet(env);
-
-		DataSet<Tuple2<String, Integer>> counts =
-				// normalize and split each line
-				text.map(line -> line.toLowerCase().split("\\W+"))
-				// convert split line in pairs (2-tuples) containing: (word,1)
-				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
-					// emit the pairs with non-zero-length words
-					Arrays.stream(tokens)
-					.filter(t -> t.length() > 0)
-					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
-				})
-				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.sum(1);
-
-		// emit result
-		if (fileOutput) {
-			counts.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount Example");
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return WordCountData.getDefaultTextLineDataSet(env);
-		}
-	}
-}