You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/20 06:07:20 UTC

[1/3] flink git commit: [FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction

Repository: flink
Updated Branches:
  refs/heads/master 95eadfe15 -> ddba1b69f


http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
index 8f8016e..f5d07de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.runtime.util.jartestprogram.FilterWithIndirection;
+import org.apache.flink.runtime.util.jartestprogram.FilterWithLambda;
+import org.apache.flink.runtime.util.jartestprogram.FilterWithMethodReference;
 import org.apache.flink.runtime.util.jartestprogram.WordCountWithAnonymousClass;
 import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass;
 import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass2;
@@ -28,6 +30,7 @@ import org.apache.flink.runtime.util.jartestprogram.AnonymousInNonStaticMethod;
 import org.apache.flink.runtime.util.jartestprogram.AnonymousInNonStaticMethod2;
 import org.apache.flink.runtime.util.jartestprogram.NestedAnonymousInnerClass;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import java.io.File;
 import java.io.FileInputStream;
@@ -37,7 +40,6 @@ import java.util.Set;
 import java.util.jar.JarInputStream;
 import java.util.zip.ZipEntry;
 
-
 public class JarFileCreatorTest {
 
 	//anonymous inner class in static method accessing a local variable in its closure.
@@ -48,14 +50,14 @@ public class JarFileCreatorTest {
 		jfc.addClass(AnonymousInStaticMethod.class)
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod$1.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod$A.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod.class");
 
 		Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
 	}
 
 	//anonymous inner class in non static method accessing a local variable in its closure.
@@ -66,14 +68,14 @@ public class JarFileCreatorTest {
 		jfc.addClass(AnonymousInNonStaticMethod.class)
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod$1.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod$A.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod.class");
 
 		Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
 	}
 
 	//anonymous inner class in non static method accessing a field of its enclosing class.
@@ -84,14 +86,14 @@ public class JarFileCreatorTest {
 		jfc.addClass(AnonymousInNonStaticMethod2.class)
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2$1.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2$A.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2.class");
 
 		Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
 	}
 
 	//anonymous inner class in an anonymous inner class accessing a field of the outermost enclosing class.
@@ -102,7 +104,7 @@ public class JarFileCreatorTest {
 		jfc.addClass(NestedAnonymousInnerClass.class)
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1$1.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1.class");
@@ -110,7 +112,54 @@ public class JarFileCreatorTest {
 
 		Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
+	}
+
+	@Ignore // this is currently not supported (see FLINK-9520)
+	@Test
+	public void testFilterWithMethodReference() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(FilterWithMethodReference.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+
+		Assert.assertTrue("Jar file for Java 8 method reference is not correct", validate(ans, out));
+		Assert.assertTrue(out.delete());
+	}
+
+	@Test
+	public void testFilterWithLambda() throws Exception{
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(FilterWithLambda.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+
+		Assert.assertTrue("Jar file for Java 8 lambda is not correct", validate(ans, out));
+		Assert.assertTrue(out.delete());
+	}
+
+	@Test
+	public void testFilterWithIndirection() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(FilterWithIndirection.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class");
+
+		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
+		Assert.assertTrue(out.delete());
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -123,14 +172,14 @@ public class JarFileCreatorTest {
 		jfc.addClass(WordCountWithExternalClass.class)
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class");
 
 		Assert.assertTrue("Jar file for External Class is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
 	}
 
 	@Test
@@ -140,14 +189,14 @@ public class JarFileCreatorTest {
 		jfc.addClass(WordCountWithInnerClass.class)
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
 
 		Assert.assertTrue("Jar file for Inner Class is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
 	}
 
 	@Test
@@ -157,14 +206,14 @@ public class JarFileCreatorTest {
 		jfc.addClass(WordCountWithAnonymousClass.class)
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass$1.class");
 
 		Assert.assertTrue("Jar file for Anonymous Class is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
 	}
 
 	@Test
@@ -174,7 +223,7 @@ public class JarFileCreatorTest {
 		jfc.addClass(WordCountWithExternalClass2.class)
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.class");
@@ -182,7 +231,7 @@ public class JarFileCreatorTest {
 
 		Assert.assertTrue("Jar file for Extend Identifier is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
 	}
 
 	@Test
@@ -193,7 +242,7 @@ public class JarFileCreatorTest {
 			.addPackage("org.apache.flink.util")
 			.createJarFile();
 
-		Set<String> ans = new HashSet<String>();
+		Set<String> ans = new HashSet<>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
@@ -201,7 +250,7 @@ public class JarFileCreatorTest {
 
 		Assert.assertTrue("Jar file for UDF package is not correct", validate(ans, out));
 
-		out.delete();
+		Assert.assertTrue(out.delete());
 	}
 
 	private boolean validate(Set<String> expected, File out) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java
new file mode 100644
index 0000000..12026e9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithIndirection.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Filter with additional indirections.
+ */
+public class FilterWithIndirection {
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
+
+		DataSet<String> output = input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter());
+		output.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java
new file mode 100644
index 0000000..ffa5756
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithLambda.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Filter with lambda that is directly passed to {@link DataSet#filter(FilterFunction)}.
+ */
+public class FilterWithLambda {
+
+	@SuppressWarnings("Convert2MethodRef")
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
+
+		DataSet<String> output = input.filter((v) -> WordFilter.filter(v));
+		output.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java
new file mode 100644
index 0000000..ddb76b4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterWithMethodReference.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * A lambda filter using a static method.
+ */
+public class FilterWithMethodReference {
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
+
+		FilterFunction<String> filter = WordFilter::filter;
+
+		DataSet<String> output = input.filter(filter);
+		output.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
new file mode 100644
index 0000000..89fca0d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * Static factory for a lambda filter function.
+ */
+public class UtilFunction {
+
+	@SuppressWarnings("Convert2MethodRef")
+	public static FilterFunction<String> getWordFilter() {
+		return (v) -> WordFilter.filter(v);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
new file mode 100644
index 0000000..498c4cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A wrapper around {@link WordFilter} to introduce additional indirection.
+ */
+public class UtilFunctionWrapper {
+
+	/**
+	 * Static factory for a lambda filter function.
+	 */
+	public static class UtilFunction {
+		@SuppressWarnings("Convert2MethodRef")
+		public static FilterFunction<String> getWordFilter() {
+			return (v) -> WordFilter.filter(v);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
new file mode 100644
index 0000000..2d072cb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+/**
+ * Static filter method for lambda tests.
+ */
+public class WordFilter {
+
+	public static boolean filter(String value) {
+		return !value.contains("not");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 7fb3822..3f935e3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -521,7 +521,6 @@ public class AllWindowedStream<T, W extends Window> {
 			AllWindowFunction.class,
 			0,
 			1,
-			new int[]{1, 0},
 			new int[]{2, 0},
 			inType,
 			null,
@@ -537,7 +536,6 @@ public class AllWindowedStream<T, W extends Window> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
 			inType,
 			null,
 			false);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
index cb18a3f..a3e28ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -72,7 +72,6 @@ public class AsyncDataStream {
 			AsyncFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1, 0},
 			in.getType(),
 			Utils.getCallLocationName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
index cb7d8c9..30047cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
@@ -131,8 +131,6 @@ public class BroadcastConnectedStream<IN1, IN2> {
 				2,
 				3,
 				TypeExtractor.NO_INDEX,
-				TypeExtractor.NO_INDEX,
-				TypeExtractor.NO_INDEX,
 				getType1(),
 				getType2(),
 				Utils.getCallLocationName(),
@@ -183,8 +181,6 @@ public class BroadcastConnectedStream<IN1, IN2> {
 				1,
 				2,
 				TypeExtractor.NO_INDEX,
-				TypeExtractor.NO_INDEX,
-				TypeExtractor.NO_INDEX,
 				getType1(),
 				getType2(),
 				Utils.getCallLocationName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index c2ebdf4..55009e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -95,9 +96,24 @@ public class CoGroupedStreams<T1, T2> {
 
 	/**
 	 * Specifies a {@link KeySelector} for elements from the first input.
+	 *
+	 * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning.
 	 */
 	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
-		TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		Preconditions.checkNotNull(keySelector);
+		final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		return where(keySelector, keyType);
+	}
+
+	/**
+	 * Specifies a {@link KeySelector} for elements from the first input with explicit type information.
+	 *
+	 * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning.
+	 * @param keyType The type information describing the key type.
+	 */
+	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
+		Preconditions.checkNotNull(keySelector);
+		Preconditions.checkNotNull(keyType);
 		return new Where<>(input1.clean(keySelector), keyType);
 	}
 
@@ -121,12 +137,28 @@ public class CoGroupedStreams<T1, T2> {
 
 		/**
 		 * Specifies a {@link KeySelector} for elements from the second input.
+		 *
+		 * @param keySelector The KeySelector to be used for extracting the second input's key for partitioning.
 		 */
 		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
-			TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
-			if (!otherKey.equals(this.keyType)) {
+			Preconditions.checkNotNull(keySelector);
+			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+			return equalTo(keySelector, otherKey);
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the second input with explicit type information for the key type.
+		 *
+		 * @param keySelector The KeySelector to be used for extracting the key for partitioning.
+		 * @param keyType The type information describing the key type.
+		 */
+		public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
+			Preconditions.checkNotNull(keySelector);
+			Preconditions.checkNotNull(keyType);
+
+			if (!keyType.equals(this.keyType)) {
 				throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
-						"first key = " + this.keyType + " , second key = " + otherKey);
+						"first key = " + this.keyType + " , second key = " + keyType);
 			}
 
 			return new EqualTo(input2.clean(keySelector));

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index e244bd2..0ada54a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -192,6 +192,28 @@ public class ConnectedStreams<IN1, IN2> {
 	}
 
 	/**
+	 * KeyBy operation for connected data stream. Assigns keys to the elements of
+	 * input1 and input2 using keySelector1 and keySelector2 with explicit type information
+	 * for the common key type.
+	 *
+	 * @param keySelector1
+	 *            The {@link KeySelector} used for grouping the first input
+	 * @param keySelector2
+	 *            The {@link KeySelector} used for grouping the second input
+	 * @param keyType The type information of the common key type.
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public <KEY> ConnectedStreams<IN1, IN2> keyBy(
+			KeySelector<IN1, KEY> keySelector1,
+			KeySelector<IN2, KEY> keySelector2,
+			TypeInformation<KEY> keyType) {
+		return new ConnectedStreams<>(
+			environment,
+			inputStream1.keyBy(keySelector1, keyType),
+			inputStream2.keyBy(keySelector2, keyType));
+	}
+
+	/**
 	 * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
 	 * the output to a common type. The transformation calls a
 	 * {@link CoMapFunction#map1} for each element of the first input and
@@ -210,8 +232,6 @@ public class ConnectedStreams<IN1, IN2> {
 			1,
 			2,
 			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
 			getType1(),
 			getType2(),
 			Utils.getCallLocationName(),
@@ -244,8 +264,6 @@ public class ConnectedStreams<IN1, IN2> {
 			1,
 			2,
 			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
 			getType1(),
 			getType2(),
 			Utils.getCallLocationName(),
@@ -281,8 +299,6 @@ public class ConnectedStreams<IN1, IN2> {
 			1,
 			2,
 			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
 			getType1(),
 			getType2(),
 			Utils.getCallLocationName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 78ba8e4..8e24ad7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -286,10 +286,25 @@ public class DataStream<T> {
 	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
 	 */
 	public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
+		Preconditions.checkNotNull(key);
 		return new KeyedStream<>(this, clean(key));
 	}
 
 	/**
+	 * It creates a new {@link KeyedStream} that uses the provided key with explicit type information
+	 * for partitioning its operator states.
+	 *
+	 * @param key The KeySelector to be used for extracting the key for partitioning.
+	 * @param keyType The type information describing the key type.
+	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
+	 */
+	public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
+		Preconditions.checkNotNull(key);
+		Preconditions.checkNotNull(keyType);
+		return new KeyedStream<>(this, clean(key), keyType);
+	}
+
+	/**
 	 * Partitions the operator state of a {@link DataStream} by the given key positions.
 	 *
 	 * @param fields
@@ -621,7 +636,6 @@ public class DataStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
 			getType(),
 			Utils.getCallLocationName(),
 			true);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index f614ab0..3d22275 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -206,5 +206,9 @@ public class IterativeStream<T> extends SingleOutputStreamOperator<T> {
 			throw groupingException;
 		}
 
+		@Override
+		public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) {
+			throw groupingException;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index 088fab9..bb67c09 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -79,9 +79,24 @@ public class JoinedStreams<T1, T2> {
 
 	/**
 	 * Specifies a {@link KeySelector} for elements from the first input.
+	 *
+	 * @param keySelector The KeySelector to be used for extracting the key for partitioning.
 	 */
 	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
-		TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		requireNonNull(keySelector);
+		final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		return where(keySelector, keyType);
+	}
+
+	/**
+	 * Specifies a {@link KeySelector} for elements from the first input with explicit type information for the key type.
+	 *
+	 * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning.
+	 * @param keyType The type information describing the key type.
+	 */
+	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
+		requireNonNull(keySelector);
+		requireNonNull(keyType);
 		return new Where<>(input1.clean(keySelector), keyType);
 	}
 
@@ -105,12 +120,28 @@ public class JoinedStreams<T1, T2> {
 
 		/**
 		 * Specifies a {@link KeySelector} for elements from the second input.
+		 *
+		 * @param keySelector The KeySelector to be used for extracting the second input's key for partitioning.
 		 */
 		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
-			TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
-			if (!otherKey.equals(this.keyType)) {
+			requireNonNull(keySelector);
+			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+			return equalTo(keySelector, otherKey);
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the second input with explicit type information for the key type.
+		 *
+		 * @param keySelector The KeySelector to be used for extracting the second input's key for partitioning.
+		 * @param keyType The type information describing the key type.
+		 */
+		public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
+			requireNonNull(keySelector);
+			requireNonNull(keyType);
+
+			if (!keyType.equals(this.keyType)) {
 				throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
-						"first key = " + this.keyType + " , second key = " + otherKey);
+						"first key = " + this.keyType + " , second key = " + keyType);
 			}
 
 			return new EqualTo(input2.clean(keySelector));
@@ -226,8 +257,6 @@ public class JoinedStreams<T1, T2> {
 				0,
 				1,
 				2,
-				new int[]{0},
-				new int[]{1},
 				TypeExtractor.NO_INDEX,
 				input1.getType(),
 				input2.getType(),
@@ -309,8 +338,6 @@ public class JoinedStreams<T1, T2> {
 				0,
 				1,
 				2,
-				new int[]{0},
-				new int[]{1},
 				new int[]{2, 0},
 				input1.getType(),
 				input2.getType(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 32a5c96..84df716 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -70,6 +70,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -305,7 +306,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
 			getType(),
 			Utils.getCallLocationName(),
 			true);
@@ -366,7 +366,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 				1,
 				2,
 				TypeExtractor.NO_INDEX,
-				TypeExtractor.NO_INDEX,
 				getType(),
 				Utils.getCallLocationName(),
 				true);
@@ -480,8 +479,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	@PublicEvolving
 	public static class IntervalJoined<IN1, IN2, KEY> {
 
-		private static final String INTERVAL_JOIN_FUNC_NAME = "IntervalJoin";
-
 		private final KeyedStream<IN1, KEY> left;
 		private final KeyedStream<IN2, KEY> right;
 
@@ -534,33 +531,52 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 		}
 
 		/**
-		 * Completes the join operation with the user function that is executed for each joined pair
+		 * Completes the join operation with the given user function that is executed for each joined pair
 		 * of elements.
-		 * @param udf The user-defined function
-		 * @param <OUT> The output type
-		 * @return Returns a DataStream
+		 *
+		 * @param processJoinFunction The user-defined process join function.
+		 * @param <OUT> The output type.
+		 * @return The transformed {@link DataStream}.
 		 */
 		@PublicEvolving
-		public <OUT> DataStream<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> udf) {
-
-			ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(udf);
+		public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
+			Preconditions.checkNotNull(processJoinFunction);
 
-			TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType(
-				cleanedUdf,
-				ProcessJoinFunction.class,    // ProcessJoinFunction<IN1, IN2, OUT>
-				0,     //					    0    1    2
+			final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(
+				processJoinFunction,
+				ProcessJoinFunction.class,
+				0,
 				1,
 				2,
-				new int[]{0},                   // lambda input 1 type arg indices
-				new int[]{1},                   // lambda input 1 type arg indices
-				TypeExtractor.NO_INDEX,         // output arg indices
-				left.getType(),                 // input 1 type information
-				right.getType(),                // input 2 type information
-				INTERVAL_JOIN_FUNC_NAME ,
-				false
+				TypeExtractor.NO_INDEX,
+				left.getType(),
+				right.getType(),
+				Utils.getCallLocationName(),
+				true
 			);
 
-			IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
+			return process(processJoinFunction, outputType);
+		}
+
+		/**
+		 * Completes the join operation with the given user function that is executed for each joined pair
+		 * of elements. This methods allows for passing explicit type information for the output type.
+		 *
+		 * @param processJoinFunction The user-defined process join function.
+		 * @param outputType The type information for the output type.
+		 * @param <OUT> The output type.
+		 * @return The transformed {@link DataStream}.
+		 */
+		@PublicEvolving
+		public <OUT> SingleOutputStreamOperator<OUT> process(
+				ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
+				TypeInformation<OUT> outputType) {
+			Preconditions.checkNotNull(processJoinFunction);
+			Preconditions.checkNotNull(outputType);
+
+			final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
+
+			final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
 				new IntervalJoinOperator<>(
 					lowerBound,
 					upperBound,
@@ -574,8 +590,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 			return left
 				.connect(right)
 				.keyBy(keySelector1, keySelector2)
-				.transform(INTERVAL_JOIN_FUNC_NAME , resultType, operator);
-
+				.transform("Interval Join", outputType, operator);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 50c6f1a..1f09b73 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -896,7 +896,6 @@ public class WindowedStream<T, K, W extends Window> {
 			WindowFunction.class,
 			0,
 			1,
-			new int[]{2, 0},
 			new int[]{3, 0},
 			inType,
 			null,
@@ -913,7 +912,6 @@ public class WindowedStream<T, K, W extends Window> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			TypeExtractor.NO_INDEX,
 			inType,
 			functionName,
 			false);

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 5baa980..cfb5adc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -21,12 +21,16 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
@@ -35,7 +39,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for {@link TypeFill}.
+ * Tests for handling missing type information either by calling {@code returns()} or having an
+ * explicit type information parameter.
  */
 @SuppressWarnings("serial")
 public class TypeFillTest {
@@ -43,6 +48,7 @@ public class TypeFillTest {
 	@Test
 	public void test() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 		try {
 			env.addSource(new TestSource<Integer>()).print();
@@ -71,12 +77,53 @@ public class TypeFillTest {
 			fail();
 		} catch (Exception ignored) {}
 
+		try {
+			source.keyBy(new TestKeySelector<Long, String>()).print();
+			fail();
+		} catch (Exception ignored) {}
+
+		try {
+			source.connect(source).keyBy(new TestKeySelector<Long, String>(), new TestKeySelector<>());
+			fail();
+		} catch (Exception ignored) {}
+
+		try {
+			source.coGroup(source).where(new TestKeySelector<>()).equalTo(new TestKeySelector<>());
+			fail();
+		} catch (Exception ignored) {}
+
+		try {
+			source.join(source).where(new TestKeySelector<>()).equalTo(new TestKeySelector<>());
+			fail();
+		} catch (Exception ignored) {}
+
+		try {
+			source.keyBy((in) -> in)
+				.intervalJoin(source.keyBy((in) -> in))
+				.between(Time.milliseconds(10L), Time.milliseconds(10L))
+				.process(new TestProcessJoinFunction<>())
+				.print();
+			fail();
+		} catch (Exception ignored) {}
+
 		env.addSource(new TestSource<Integer>()).returns(Integer.class);
 		source.map(new TestMap<Long, Long>()).returns(Long.class).print();
 		source.flatMap(new TestFlatMap<Long, Long>()).returns(new TypeHint<Long>(){}).print();
 		source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns(BasicTypeInfo.INT_TYPE_INFO).print();
 		source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
 				.returns(BasicTypeInfo.INT_TYPE_INFO).print();
+		source.connect(source).keyBy(new TestKeySelector<>(), new TestKeySelector<>(), Types.STRING);
+		source.coGroup(source).where(new TestKeySelector<>(), Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING);
+		source.join(source).where(new TestKeySelector<>(), Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING);
+		source.keyBy((in) -> in)
+			.intervalJoin(source.keyBy((in) -> in))
+			.between(Time.milliseconds(10L), Time.milliseconds(10L))
+			.process(new TestProcessJoinFunction<Long, Long, String>())
+			.returns(Types.STRING);
+		source.keyBy((in) -> in)
+			.intervalJoin(source.keyBy((in) -> in))
+			.between(Time.milliseconds(10L), Time.milliseconds(10L))
+			.process(new TestProcessJoinFunction<>(), Types.STRING);
 
 		assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
 				source.map(new TestMap<Long, Long>()).returns(Long.class).getType());
@@ -142,4 +189,20 @@ public class TypeFillTest {
 		public void flatMap2(IN2 value, Collector<OUT> out) throws Exception {}
 
 	}
+
+	private static class TestKeySelector<IN, KEY> implements KeySelector<IN, KEY> {
+
+		@Override
+		public KEY getKey(IN value) throws Exception {
+			return null;
+		}
+	}
+
+	private static class TestProcessJoinFunction<IN1, IN2, OUT> extends ProcessJoinFunction<IN1, IN2, OUT> {
+
+		@Override
+		public void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception {
+			// nothing to do
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index 453f525..21583fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -669,6 +669,39 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		compareResultAsTuples(result, expected);
 	}
 
+	@Test
+	public void testCoGroupLambda() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+			new Tuple2<>(1, "hello"),
+			new Tuple2<>(2, "what's"),
+			new Tuple2<>(2, "up")
+		);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+			new Tuple2<>(1, "not"),
+			new Tuple2<>(1, "much"),
+			new Tuple2<>(2, "really")
+		);
+		DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
+			.with((Iterable<Tuple2<Integer, String>> values1, Iterable<Tuple2<Integer, String>> values2,
+					Collector<Integer> out) -> {
+				int sum = 0;
+				for (Tuple2<Integer, String> next : values1) {
+					sum += next.f0;
+				}
+				for (Tuple2<Integer, String> next : values2) {
+					sum += next.f0;
+				}
+				out.collect(sum);
+			}).returns(Integer.class);
+		List<Integer> result = joined.collect();
+
+		String expected = "6\n3\n";
+
+		compareResultAsText(result, expected);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  UDF classes
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
index dfb3efb..f145555 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
@@ -515,4 +515,38 @@ public class MapITCase extends MultipleProgramsTestBase {
 			return value;
 		}
 	}
+
+	@Test
+	public void testMapWithLambdas() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
+		DataSet<String> mappedDs = stringDs
+			.map(Object::toString)
+			.map(s -> s.replace("1", "2"))
+			.map(Trade::new)
+			.map(Trade::toString);
+		List<String> result = mappedDs.collect();
+
+		String expected = "22\n" +
+			"22\n" +
+			"23\n" +
+			"24\n";
+
+		compareResultAsText(result, expected);
+	}
+
+	private static class Trade {
+
+		public String v;
+
+		public Trade(String v) {
+			this.v = v;
+		}
+
+		@Override
+		public String toString() {
+			return v;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
index 2d6897b..750769c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
@@ -90,7 +90,11 @@ public class ReduceITCase extends MultipleProgramsTestBase {
 
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
-				groupBy(4, 0).reduce(new Tuple5Reduce());
+				groupBy(4, 0).reduce((in1, in2) -> {
+					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<>();
+					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+					return out;
+				});
 
 		List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
 				.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6d251f1..cd30e38 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,6 @@ under the License.
 		<module>flink-shaded-curator</module>
 		<module>flink-core</module>
 		<module>flink-java</module>
-		<module>flink-java8</module>
 		<module>flink-scala</module>
 		<module>flink-filesystems</module>
 		<module>flink-runtime</module>


[3/3] flink git commit: [FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction

Posted by tw...@apache.org.
[FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction

This commit removes the flink-java8 module and merges some tests into flink-core/flink-runtime. It ensures to have the possibility for passing explicit type information in DataStream API as a fallback. Since the tycho compiler approach was very hacky and seems not to work anymore, this commit also removes all references in the docs and quickstarts.

This closes #6120.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddba1b69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddba1b69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddba1b69

Branch: refs/heads/master
Commit: ddba1b69f43cbb885e178dfaafa120f1fe196a13
Parents: 95eadfe
Author: Timo Walther <tw...@apache.org>
Authored: Mon Jun 4 12:49:43 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Fri Jul 20 08:06:07 2018 +0200

----------------------------------------------------------------------
 docs/dev/java8.md                               | 198 ----------
 docs/dev/java_lambdas.md                        | 138 +++++++
 .../api/java/typeutils/TypeExtractionUtils.java |  23 ++
 .../flink/api/java/typeutils/TypeExtractor.java | 290 ++++++--------
 .../java/typeutils/LambdaExtractionTest.java    | 340 ++++++++++++++++
 .../examples/java/relational/TPCHQuery10.java   |  28 +-
 .../examples/java/wordcount/WordCount.java      |   3 +-
 .../streaming/examples/wordcount/WordCount.java |  14 +-
 flink-java8/pom.xml                             | 225 -----------
 .../examples/java8/relational/TPCHQuery10.java  | 212 ----------
 .../examples/java8/wordcount/WordCount.java     | 124 ------
 .../examples/java8/wordcount/WordCount.java     | 124 ------
 .../java/type/lambdas/LambdaExtractionTest.java | 383 -------------------
 .../org/apache/flink/cep/CEPLambdaTest.java     | 104 -----
 .../runtime/util/JarFileCreatorLambdaTest.java  | 113 ------
 .../util/jartestprogram/FilterLambda1.java      |  41 --
 .../util/jartestprogram/FilterLambda2.java      |  39 --
 .../util/jartestprogram/FilterLambda3.java      |  39 --
 .../util/jartestprogram/FilterLambda4.java      |  38 --
 .../util/jartestprogram/UtilFunction.java       |  30 --
 .../jartestprogram/UtilFunctionWrapper.java     |  35 --
 .../runtime/util/jartestprogram/WordFilter.java |  28 --
 .../operators/lambdas/AllGroupReduceITCase.java |  59 ---
 .../java/operators/lambdas/CoGroupITCase.java   |  74 ----
 .../api/java/operators/lambdas/CrossITCase.java |  73 ----
 .../java/operators/lambdas/FilterITCase.java    |  91 -----
 .../java/operators/lambdas/FlatJoinITCase.java  |  68 ----
 .../java/operators/lambdas/FlatMapITCase.java   |  56 ---
 .../operators/lambdas/GroupReduceITCase.java    |  69 ----
 .../api/java/operators/lambdas/JoinITCase.java  |  69 ----
 .../api/java/operators/lambdas/MapITCase.java   |  74 ----
 .../java/operators/lambdas/ReduceITCase.java    | 109 ------
 .../src/test/resources/log4j-test.properties    |  19 -
 .../org/apache/flink/cep/PatternStream.java     |  16 +-
 .../java/org/apache/flink/cep/CEPITCase.java    |  35 +-
 .../flink/graph/asm/translate/Translate.java    |   4 -
 .../main/resources/archetype-resources/pom.xml  |  19 -
 .../flink/runtime/util/JarFileCreatorTest.java  |  91 ++++-
 .../jartestprogram/FilterWithIndirection.java   |  38 ++
 .../util/jartestprogram/FilterWithLambda.java   |  40 ++
 .../FilterWithMethodReference.java              |  41 ++
 .../util/jartestprogram/UtilFunction.java       |  32 ++
 .../jartestprogram/UtilFunctionWrapper.java     |  37 ++
 .../runtime/util/jartestprogram/WordFilter.java |  29 ++
 .../api/datastream/AllWindowedStream.java       |   2 -
 .../api/datastream/AsyncDataStream.java         |   1 -
 .../datastream/BroadcastConnectedStream.java    |   4 -
 .../api/datastream/CoGroupedStreams.java        |  40 +-
 .../api/datastream/ConnectedStreams.java        |  28 +-
 .../streaming/api/datastream/DataStream.java    |  16 +-
 .../api/datastream/IterativeStream.java         |   4 +
 .../streaming/api/datastream/JoinedStreams.java |  43 ++-
 .../streaming/api/datastream/KeyedStream.java   |  65 ++--
 .../api/datastream/WindowedStream.java          |   2 -
 .../flink/streaming/api/TypeFillTest.java       |  65 +++-
 .../flink/test/operators/CoGroupITCase.java     |  33 ++
 .../apache/flink/test/operators/MapITCase.java  |  34 ++
 .../flink/test/operators/ReduceITCase.java      |   6 +-
 pom.xml                                         |   1 -
 59 files changed, 1223 insertions(+), 2833 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/docs/dev/java8.md
----------------------------------------------------------------------
diff --git a/docs/dev/java8.md b/docs/dev/java8.md
deleted file mode 100644
index 8e7e643..0000000
--- a/docs/dev/java8.md
+++ /dev/null
@@ -1,198 +0,0 @@
----
-title: "Java 8"
-nav-parent_id: api-concepts
-nav-pos: 20
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Java 8 introduces several new language features designed for faster and clearer coding. With the most important feature,
-the so-called "Lambda Expressions", Java 8 opens the door to functional programming. Lambda Expressions allow for implementing and
-passing functions in a straightforward way without having to declare additional (anonymous) classes.
-
-The newest version of Flink supports the usage of Lambda Expressions for all operators of the Java API.
-This document shows how to use Lambda Expressions and describes current limitations. For a general introduction to the
-Flink API, please refer to the [Programming Guide]({{ site.baseurl }}/dev/api_concepts.html)
-
-* TOC
-{:toc}
-
-### Examples
-
-The following example illustrates how to implement a simple, inline `map()` function that squares its input using a Lambda Expression.
-The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java 8 compiler.
-
-{% highlight java %}
-env.fromElements(1, 2, 3)
-// returns the squared i
-.map(i -> i*i)
-.print();
-{% endhighlight %}
-
-The next two examples show different implementations of a function that uses a `Collector` for output.
-Functions, such as `flatMap()`, require an output type (in this case `String`) to be defined for the `Collector` in order to be type-safe.
-If the `Collector` type can not be inferred from the surrounding context, it needs to be declared in the Lambda Expression's parameter list manually.
-Otherwise the output will be treated as type `Object` which can lead to undesired behaviour.
-
-{% highlight java %}
-DataSet<Integer> input = env.fromElements(1, 2, 3);
-
-// collector type must be declared
-input.flatMap((Integer number, Collector<String> out) -> {
-    StringBuilder builder = new StringBuilder();
-    for(int i = 0; i < number; i++) {
-        builder.append("a");
-        out.collect(builder.toString());
-    }
-})
-// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
-.print();
-{% endhighlight %}
-
-{% highlight java %}
-DataSet<Integer> input = env.fromElements(1, 2, 3);
-
-// collector type must not be declared, it is inferred from the type of the dataset
-DataSet<String> manyALetters = input.flatMap((number, out) -> {
-    StringBuilder builder = new StringBuilder();
-    for(int i = 0; i < number; i++) {
-       builder.append("a");
-       out.collect(builder.toString());
-    }
-});
-
-// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
-manyALetters.print();
-{% endhighlight %}
-
-The following code demonstrates a word count which makes extensive use of Lambda Expressions.
-
-{% highlight java %}
-DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
-
-// filter out strings that contain "not"
-input.filter(line -> !line.contains("not"))
-// split each line by space
-.map(line -> line.split(" "))
-// emit a pair <word,1> for each array element
-.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out)
-    -> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
-    )
-// group and sum up
-.groupBy(0).sum(1)
-// print
-.print();
-{% endhighlight %}
-
-### Compiler Limitations
-Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler contained in Eclipse Luna 4.4.2 (and above)**.
-
-Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely.
-Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all generic parameters related to Lambda Expressions. This means that types such as `Tuple2<String, Integer>` or `Collector<String>` declared as a Lambda function input or output parameter will be pruned to `Tuple2` or `Collector` in the compiled `.class` files, which is too little information for the Flink compiler.
-
-How to compile a Flink job that contains Lambda Expressions with the JDT compiler will be covered in the next section.
-
-However, it is possible to implement functions such as `map()` or `filter()` with Lambda Expressions in Java 8 compilers other than the Eclipse JDT compiler as long as the function has no `Collector`s or `Iterable`s *and* only if the function handles unparameterized types such as `Integer`, `Long`, `String`, `MyOwnClass` (types without Generics!).
-
-#### Compile Flink jobs with the Eclipse JDT compiler and Maven
-
-If you are using the Eclipse IDE, you can run and debug your Flink code within the IDE without any problems after some configuration steps. The Eclipse IDE by default compiles its Java sources with the Eclipse JDT compiler. The next section describes how to configure the Eclipse IDE.
-
-If you are using a different IDE such as IntelliJ IDEA or you want to package your Jar-File with Maven to run your job on a cluster, you need to modify your project's `pom.xml` file and build your program with Maven. The [quickstart]({{site.baseurl}}/quickstart/setup_quickstart.html) contains preconfigured Maven projects which can be used for new projects or as a reference. Uncomment the mentioned lines in your generated quickstart `pom.xml` file if you want to use Java 8 with Lambda Expressions.
-
-Alternatively, you can manually insert the following lines to your Maven `pom.xml` file. Maven will then use the Eclipse JDT compiler for compilation.
-
-{% highlight xml %}
-<!-- put these lines under "project/build/pluginManagement/plugins" of your pom.xml -->
-
-<plugin>
-    <!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
-    <artifactId>maven-compiler-plugin</artifactId>
-    <configuration>
-        <source>1.8</source>
-        <target>1.8</target>
-        <compilerId>jdt</compilerId>
-    </configuration>
-    <dependencies>
-        <!-- This dependency provides the implementation of compiler "jdt": -->
-        <dependency>
-            <groupId>org.eclipse.tycho</groupId>
-            <artifactId>tycho-compiler-jdt</artifactId>
-            <version>0.21.0</version>
-        </dependency>
-    </dependencies>
-</plugin>
-{% endhighlight %}
-
-If you are using Eclipse for development, the m2e plugin might complain about the inserted lines above and marks your `pom.xml` as invalid. If so, insert the following lines to your `pom.xml`.
-
-{% highlight xml %}
-<!-- put these lines under "project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions" of your pom.xml -->
-
-<pluginExecution>
-    <pluginExecutionFilter>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <versionRange>[3.1,)</versionRange>
-        <goals>
-            <goal>testCompile</goal>
-            <goal>compile</goal>
-        </goals>
-    </pluginExecutionFilter>
-    <action>
-        <ignore></ignore>
-    </action>
-</pluginExecution>
-{% endhighlight %}
-
-#### Run and debug Flink jobs within the Eclipse IDE
-
-First of all, make sure you are running a current version of Eclipse IDE (4.4.2 or later). Also make sure that you have a Java 8 Runtime Environment (JRE) installed in Eclipse IDE (`Window` -> `Preferences` -> `Java` -> `Installed JREs`).
-
-Create/Import your Eclipse project.
-
-If you are using Maven, you also need to change the Java version in your `pom.xml` for the `maven-compiler-plugin`. Otherwise right click the `JRE System Library` section of your project and open the `Properties` window in order to switch to a Java 8 JRE (or above) that supports Lambda Expressions.
-
-The Eclipse JDT compiler needs a special compiler flag in order to store type information in `.class` files. Open the JDT configuration file at `{project directory}/.settings/org.eclipse.jdt.core.prefs` with your favorite text editor and add the following line:
-
-{% highlight plain %}
-org.eclipse.jdt.core.compiler.codegen.lambda.genericSignature=generate
-{% endhighlight %}
-
-If not already done, also modify the Java versions of the following properties to `1.8` (or above):
-
-{% highlight plain %}
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
-org.eclipse.jdt.core.compiler.compliance=1.8
-org.eclipse.jdt.core.compiler.source=1.8
-{% endhighlight %}
-
-After you have saved the file, perform a complete project refresh in Eclipse IDE.
-
-If you are using Maven, right click your Eclipse project and select `Maven` -> `Update Project...`.
-
-You have configured everything correctly, if the following Flink program runs without exceptions:
-
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.fromElements(1, 2, 3).map((in) -> new Tuple1<String>(" " + in)).print();
-env.execute();
-{% endhighlight %}
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/docs/dev/java_lambdas.md
----------------------------------------------------------------------
diff --git a/docs/dev/java_lambdas.md b/docs/dev/java_lambdas.md
new file mode 100644
index 0000000..4b306ac
--- /dev/null
+++ b/docs/dev/java_lambdas.md
@@ -0,0 +1,138 @@
+---
+title: "Java Lambda Expressions"
+nav-parent_id: api-concepts
+nav-pos: 20
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Java 8 introduced several new language features designed for faster and clearer coding. With the most important feature,
+the so-called "Lambda Expressions", it opened the door to functional programming. Lambda expressions allow for implementing and
+passing functions in a straightforward way without having to declare additional (anonymous) classes.
+
+<span class="label label-danger">Attention</span> Flink supports the usage of lambda expressions for all operators of the Java API, however, whenever a lambda expression uses Java generics you need to declare type information *explicitly*. 
+
+This document shows how to use lambda expressions and describes current limitations. For a general introduction to the
+Flink API, please refer to the [Programming Guide]({{ site.baseurl }}/dev/api_concepts.html)
+
+### Examples and Limitations
+
+The following example illustrates how to implement a simple, inline `map()` function that squares its input using a lambda expression.
+The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java compiler.
+
+{% highlight java %}
+env.fromElements(1, 2, 3)
+// returns the squared i
+.map(i -> i*i)
+.print();
+{% endhighlight %}
+
+Flink can automatically extract the result type information from the implementation of the method signature `OUT map(IN value)` because `OUT` is not generic but `Integer`.
+
+Unfortunately, functions such as `flatMap()` with a signature `void flatMap(IN value, Collector<OUT> out)` are compiled into `void flatMap(IN value, Collector out)` by the Java compiler. This makes it impossible for Flink to infer the type information for the output type automatically.
+
+Flink will most likely throw an exception similar to the following:
+
+{% highlight plain%}
+org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
+    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
+    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
+    Otherwise the type has to be specified explicitly using type information.
+{% endhighlight %}
+
+In this case, the type information needs to be *specified explicitly*, otherwise the output will be treated as type `Object` which leads to unefficient serialization.
+
+{% highlight java %}
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.util.Collector;
+
+DataSet<Integer> input = env.fromElements(1, 2, 3);
+
+// collector type must be declared
+input.flatMap((Integer number, Collector<String> out) -> {
+    StringBuilder builder = new StringBuilder();
+    for(int i = 0; i < number; i++) {
+        builder.append("a");
+        out.collect(builder.toString());
+    }
+})
+// provide type information explicitly
+.returns(Types.STRING)
+// prints "a", "a", "aa", "a", "aa", "aaa"
+.print();
+{% endhighlight %}
+
+Similar problems occur when using a `map()` function with a generic return type. A method signature `Tuple2<Integer, Integer> map(Integer value)` is erasured to `Tuple2 map(Integer value)` in the example below.
+
+{% highlight java %}
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+env.fromElements(1, 2, 3)
+    .map(i -> Tuple2.of(i, i))    // no information about fields of Tuple2
+    .print();
+{% endhighlight %}
+
+In general, those problems can be solved in multiple ways:
+
+{% highlight java %}
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+// use the explicit ".returns(...)"
+env.fromElements(1, 2, 3)
+    .map(i -> Tuple2.of(i, i))
+    .returns(Types.TUPLE(Types.INT, Types.INT))
+    .print();
+
+// use a class instead
+env.fromElements(1, 2, 3)
+    .map(new MyTuple2Mapper())
+    .print();
+
+public static class MyTuple2Mapper extends MapFunction<Integer, Integer> {
+    @Override
+    public Tuple2<Integer, Integer> map(Integer i) {
+        return Tuple2.of(i, i);
+    }
+}
+
+// use an anonymous class instead
+env.fromElements(1, 2, 3)
+    .map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
+        @Override
+        public Tuple2<Integer, Integer> map(Integer i) {
+            return Tuple2.of(i, i);
+        }
+    })
+    .print();
+
+// or in this example use a tuple subclass instead
+env.fromElements(1, 2, 3)
+    .map(i -> new DoubleTuple(i, i))
+    .print();
+
+public static class DoubleTuple extends Tuple2<Integer, Integer> {
+    public DoubleTuple(int f0, int f1) {
+        this.f0 = f0;
+        this.f1 = f1;
+    }
+}
+{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index f005ed9..07f1e1e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -158,6 +158,7 @@ public class TypeExtractionUtils {
 	/**
 	 * Extracts type from given index from lambda. It supports nested types.
 	 *
+	 * @param baseClass SAM function that the lambda implements
 	 * @param exec lambda function to extract the type from
 	 * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy
 	 * @param paramLen count of total parameters of the lambda (including closure parameters)
@@ -165,14 +166,17 @@ public class TypeExtractionUtils {
 	 * @return extracted type
 	 */
 	public static Type extractTypeFromLambda(
+		Class<?> baseClass,
 		LambdaExecutable exec,
 		int[] lambdaTypeArgumentIndices,
 		int paramLen,
 		int baseParametersLen) {
 		Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]];
 		for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) {
+			validateLambdaType(baseClass, output);
 			output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]);
 		}
+		validateLambdaType(baseClass, output);
 		return output;
 	}
 
@@ -328,4 +332,23 @@ public class TypeExtractionUtils {
 		}
 		return Object.class;
 	}
+
+	/**
+	 * Checks whether the given type has the generic parameters declared in the class definition.
+	 *
+	 * @param t type to be validated
+	 */
+	public static void validateLambdaType(Class<?> baseClass, Type t) {
+		if (!(t instanceof Class)) {
+			return;
+		}
+		final Class<?> clazz = (Class<?>) t;
+
+		if (clazz.getTypeParameters().length > 0) {
+			throw new InvalidTypesException("The generic type parameters of '" + clazz.getSimpleName() + "' are missing. "
+				+ "In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. "
+				+ "An easy workaround is to use an (anonymous) class instead that implements the '" + baseClass.getName() + "' interface. "
+				+ "Otherwise the type has to be specified explicitly using type information.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index f514384..07b6cfe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -84,6 +84,12 @@ import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClas
 /**
  * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
  * functions.
+ *
+ * <p>NOTES FOR USERS OF THIS CLASS:
+ * Automatic type extraction is a hacky business that depends on a lot of variables such as generics,
+ * compiler, interfaces, etc. The type extraction fails regularly with either {@link MissingTypeInfo} or
+ * hard exceptions. Whenever you use methods of this class, make sure to provide a way to pass custom
+ * type information as a fallback.
  */
 @Public
 public class TypeExtractor {
@@ -171,7 +177,6 @@ public class TypeExtractor {
 			MapFunction.class,
 			0,
 			1,
-			new int[]{0},
 			NO_INDEX,
 			inType,
 			functionName,
@@ -193,7 +198,6 @@ public class TypeExtractor {
 			FlatMapFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1, 0},
 			inType,
 			functionName,
@@ -222,7 +226,6 @@ public class TypeExtractor {
 			FoldFunction.class,
 			0,
 			1,
-			new int[]{1},
 			NO_INDEX,
 			inType,
 			functionName,
@@ -241,7 +244,6 @@ public class TypeExtractor {
 			AggregateFunction.class,
 			0,
 			1,
-			new int[]{0},
 			NO_INDEX,
 			inType,
 			functionName,
@@ -261,7 +263,6 @@ public class TypeExtractor {
 			0,
 			2,
 			NO_INDEX,
-			NO_INDEX,
 			inType,
 			functionName,
 			allowMissing);
@@ -281,7 +282,6 @@ public class TypeExtractor {
 			MapPartitionFunction.class,
 			0,
 			1,
-			new int[]{0, 0},
 			new int[]{1, 0},
 			inType,
 			functionName,
@@ -302,7 +302,6 @@ public class TypeExtractor {
 			GroupReduceFunction.class,
 			0,
 			1,
-			new int[]{0, 0},
 			new int[]{1, 0},
 			inType,
 			functionName,
@@ -323,7 +322,6 @@ public class TypeExtractor {
 			GroupCombineFunction.class,
 			0,
 			1,
-			new int[]{0, 0},
 			new int[]{1, 0},
 			inType,
 			functionName,
@@ -347,8 +345,6 @@ public class TypeExtractor {
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
 			new int[]{2, 0},
 			in1Type,
 			in2Type,
@@ -373,8 +369,6 @@ public class TypeExtractor {
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
 			NO_INDEX,
 			in1Type,
 			in2Type,
@@ -399,8 +393,6 @@ public class TypeExtractor {
 			0,
 			1,
 			2,
-			new int[]{0, 0},
-			new int[]{1, 0},
 			new int[]{2, 0},
 			in1Type,
 			in2Type,
@@ -425,8 +417,6 @@ public class TypeExtractor {
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
 			NO_INDEX,
 			in1Type,
 			in2Type,
@@ -448,7 +438,6 @@ public class TypeExtractor {
 			KeySelector.class,
 			0,
 			1,
-			new int[]{0},
 			NO_INDEX,
 			inType,
 			functionName,
@@ -465,46 +454,16 @@ public class TypeExtractor {
 		Partitioner<T> partitioner,
 		String functionName,
 		boolean allowMissing) {
-		try {
-			final LambdaExecutable exec;
-			try {
-				exec = checkAndExtractLambda(partitioner);
-			} catch (TypeExtractionException e) {
-				throw new InvalidTypesException("Internal error occurred.", e);
-			}
-			if (exec != null) {
-				// check for lambda type erasure
-				validateLambdaGenericParameters(exec);
-
-				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
-				// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
-				final int paramLen = exec.getParameterTypes().length;
 
-				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(Partitioner.class);
-				// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
-				final int baseParametersLen = sam.getParameterTypes().length;
-
-				final Type keyType = TypeExtractionUtils.extractTypeFromLambda(
-					exec,
-					new int[]{0},
-					paramLen,
-					baseParametersLen);
-				return new TypeExtractor().privateCreateTypeInfo(keyType, null, null);
-			} else {
-				return new TypeExtractor().privateCreateTypeInfo(
-					Partitioner.class,
-					partitioner.getClass(),
-					0,
-					null,
-					null);
-			}
-		} catch (InvalidTypesException e) {
-			if (allowMissing) {
-				return (TypeInformation<T>) new MissingTypeInfo(functionName != null ? functionName : partitioner.toString(), e);
-			} else {
-				throw e;
-			}
-		}
+		return getUnaryOperatorReturnType(
+			partitioner,
+			Partitioner.class,
+			-1,
+			0,
+			new int[]{0},
+			null,
+			functionName,
+			allowMissing);
 	}
 
 
@@ -524,24 +483,43 @@ public class TypeExtractor {
 	/**
 	 * Returns the unary operator's return type.
 	 *
-	 * <p><b>NOTE:</b> lambda type indices allow extraction of Type from lambdas. To extract input type <b>IN</b>
-	 * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices.
+	 * <p>This method can extract a type in 4 different ways:
+	 *
+	 * <p>1. By using the generics of the base class like MyFunction<X, Y, Z, IN, OUT>.
+	 *    This is what outputTypeArgumentIndex (in this example "4") is good for.
+	 *
+	 * <p>2. By using input type inference SubMyFunction<T, String, String, String, T>.
+	 *    This is what inputTypeArgumentIndex (in this example "0") and inType is good for.
+	 *
+	 * <p>3. By using the static method that a compiler generates for Java lambdas.
+	 *    This is what lambdaOutputTypeArgumentIndices is good for. Given that MyFunction has
+	 *    the following single abstract method:
 	 *
 	 * <pre>
 	 * <code>
-	 * OUT apply(Map<String, List<IN>> value)
+	 * void apply(IN value, Collector<OUT> value)
 	 * </code>
 	 * </pre>
 	 *
+	 * <p> Lambda type indices allow the extraction of a type from lambdas. To extract the
+	 *     output type <b>OUT</b> from the function one should pass {@code new int[] {1, 0}}.
+	 *     "1" for selecting the parameter and 0 for the first generic in this type.
+	 *     Use {@code TypeExtractor.NO_INDEX} for selecting the return type of the lambda for
+	 *     extraction or if the class cannot be a lambda because it is not a single abstract
+	 *     method interface.
+	 *
+	 * <p>4. By using interfaces such as {@link TypeInfoFactory} or {@link ResultTypeQueryable}.
+	 *
+	 * <p>See also comments in the header of this class.
+	 *
 	 * @param function Function to extract the return type from
 	 * @param baseClass Base class of the function
-	 * @param inputTypeArgumentIndex Index of input type in the class specification
-	 * @param outputTypeArgumentIndex Index of output type in the class specification
-	 * @param lambdaInputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
+	 * @param inputTypeArgumentIndex Index of input generic type in the base class specification (ignored if inType is null)
+	 * @param outputTypeArgumentIndex Index of output generic type in the base class specification
 	 * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
-	 * @param inType Type of the input elements (In case of an iterable, it is the element type)
+	 * @param inType Type of the input elements (In case of an iterable, it is the element type) or null
 	 * @param functionName Function name
-	 * @param allowMissing Can the type information be missing
+	 * @param allowMissing Can the type information be missing (this generates a MissingTypeInfo for postponing an exception)
 	 * @param <IN> Input type
 	 * @param <OUT> Output type
 	 * @return TypeInformation of the return type of the function
@@ -553,11 +531,23 @@ public class TypeExtractor {
 		Class<?> baseClass,
 		int inputTypeArgumentIndex,
 		int outputTypeArgumentIndex,
-		int[] lambdaInputTypeArgumentIndices,
 		int[] lambdaOutputTypeArgumentIndices,
 		TypeInformation<IN> inType,
 		String functionName,
 		boolean allowMissing) {
+
+		Preconditions.checkArgument(inType == null || inputTypeArgumentIndex >= 0, "Input type argument index was not provided");
+		Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
+		Preconditions.checkArgument(
+			lambdaOutputTypeArgumentIndices != null,
+			"Indices for output type arguments within lambda not provided");
+
+		// explicit result type has highest precedence
+		if (function instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable<OUT>) function).getProducedType();
+		}
+
+		// perform extraction
 		try {
 			final LambdaExecutable exec;
 			try {
@@ -566,14 +556,6 @@ public class TypeExtractor {
 				throw new InvalidTypesException("Internal error occurred.", e);
 			}
 			if (exec != null) {
-				Preconditions.checkArgument(
-					lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
-					"Indices for input type arguments within lambda not provided");
-				Preconditions.checkArgument(
-					lambdaOutputTypeArgumentIndices != null,
-					"Indices for output type arguments within lambda not provided");
-				// check for lambda type erasure
-				validateLambdaGenericParameters(exec);
 
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
 				// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
@@ -584,43 +566,23 @@ public class TypeExtractor {
 				// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
 				final int baseParametersLen = sam.getParameterTypes().length;
 
-				// executable references "this" implicitly
-				if (paramLen <= 0) {
-					// executable declaring class can also be a super class of the input type
-					// we only validate if the executable exists in input type
-					validateInputContainsExecutable(exec, inType);
-				}
-				else {
-					final Type input = TypeExtractionUtils.extractTypeFromLambda(
-						exec,
-						lambdaInputTypeArgumentIndices,
-						paramLen,
-						baseParametersLen);
-					validateInputType(input, inType);
-				}
-
-				if (function instanceof ResultTypeQueryable) {
-					return ((ResultTypeQueryable<OUT>) function).getProducedType();
-				}
-
 				final Type output;
 				if (lambdaOutputTypeArgumentIndices.length > 0) {
 					output = TypeExtractionUtils.extractTypeFromLambda(
+						baseClass,
 						exec,
 						lambdaOutputTypeArgumentIndices,
 						paramLen,
 						baseParametersLen);
 				} else {
 					output = exec.getReturnType();
+					TypeExtractionUtils.validateLambdaType(baseClass, output);
 				}
 
 				return new TypeExtractor().privateCreateTypeInfo(output, inType, null);
 			} else {
-				Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument index was not provided");
-				Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
-				validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);
-				if(function instanceof ResultTypeQueryable) {
-					return ((ResultTypeQueryable<OUT>) function).getProducedType();
+				if (inType != null) {
+					validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);
 				}
 				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, inType, null);
 			}
@@ -637,27 +599,45 @@ public class TypeExtractor {
 	/**
 	 * Returns the binary operator's return type.
 	 *
-	 * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN1</b>
-	 * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInput1TypeArgumentIndices.
+	 * <p>This method can extract a type in 4 different ways:
+	 *
+	 * <p>1. By using the generics of the base class like MyFunction<X, Y, Z, IN, OUT>.
+	 *    This is what outputTypeArgumentIndex (in this example "4") is good for.
+	 *
+	 * <p>2. By using input type inference SubMyFunction<T, String, String, String, T>.
+	 *    This is what inputTypeArgumentIndex (in this example "0") and inType is good for.
+	 *
+	 * <p>3. By using the static method that a compiler generates for Java lambdas.
+	 *    This is what lambdaOutputTypeArgumentIndices is good for. Given that MyFunction has
+	 *    the following single abstract method:
 	 *
 	 * <pre>
 	 * <code>
-	 * OUT apply(Map<String, List<IN1>> value1, List<IN2> value2)
+	 * void apply(IN value, Collector<OUT> value)
 	 * </code>
 	 * </pre>
 	 *
+	 * <p> Lambda type indices allow the extraction of a type from lambdas. To extract the
+	 *     output type <b>OUT</b> from the function one should pass {@code new int[] {1, 0}}.
+	 *     "1" for selecting the parameter and 0 for the first generic in this type.
+	 *     Use {@code TypeExtractor.NO_INDEX} for selecting the return type of the lambda for
+	 *     extraction or if the class cannot be a lambda because it is not a single abstract
+	 *     method interface.
+	 *
+	 * <p>4. By using interfaces such as {@link TypeInfoFactory} or {@link ResultTypeQueryable}.
+	 *
+	 * <p>See also comments in the header of this class.
+	 *
 	 * @param function Function to extract the return type from
 	 * @param baseClass Base class of the function
-	 * @param input1TypeArgumentIndex Index of first input type in the class specification
-	 * @param input2TypeArgumentIndex Index of second input type in the class specification
-	 * @param outputTypeArgumentIndex Index of output type in the class specification
-	 * @param lambdaInput1TypeArgumentIndices Table of indices of the type argument specifying the first input type. See example.
-	 * @param lambdaInput2TypeArgumentIndices Table of indices of the type argument specifying the second input type. See example.
+	 * @param input1TypeArgumentIndex Index of first input generic type in the class specification (ignored if in1Type is null)
+	 * @param input2TypeArgumentIndex Index of second input generic type in the class specification (ignored if in2Type is null)
+	 * @param outputTypeArgumentIndex Index of output generic type in the class specification
 	 * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the output type. See example.
 	 * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
 	 * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
 	 * @param functionName Function name
-	 * @param allowMissing Can the type information be missing
+	 * @param allowMissing Can the type information be missing (this generates a MissingTypeInfo for postponing an exception)
 	 * @param <IN1> Left side input type
 	 * @param <IN2> Right side input type
 	 * @param <OUT> Output type
@@ -671,13 +651,25 @@ public class TypeExtractor {
 		int input1TypeArgumentIndex,
 		int input2TypeArgumentIndex,
 		int outputTypeArgumentIndex,
-		int[] lambdaInput1TypeArgumentIndices,
-		int[] lambdaInput2TypeArgumentIndices,
 		int[] lambdaOutputTypeArgumentIndices,
 		TypeInformation<IN1> in1Type,
 		TypeInformation<IN2> in2Type,
 		String functionName,
 		boolean allowMissing) {
+
+		Preconditions.checkArgument(in1Type == null || input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided");
+		Preconditions.checkArgument(in2Type == null || input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided");
+		Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
+		Preconditions.checkArgument(
+			lambdaOutputTypeArgumentIndices != null,
+			"Indices for output type arguments within lambda not provided");
+
+		// explicit result type has highest precedence
+		if (function instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable<OUT>) function).getProducedType();
+		}
+
+		// perform extraction
 		try {
 			final LambdaExecutable exec;
 			try {
@@ -686,17 +678,6 @@ public class TypeExtractor {
 				throw new InvalidTypesException("Internal error occurred.", e);
 			}
 			if (exec != null) {
-				Preconditions.checkArgument(
-					lambdaInput1TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1,
-					"Indices for first input type arguments within lambda not provided");
-				Preconditions.checkArgument(
-					lambdaInput2TypeArgumentIndices != null && lambdaInput2TypeArgumentIndices.length >= 1,
-					"Indices for second input type arguments within lambda not provided");
-				Preconditions.checkArgument(
-					lambdaOutputTypeArgumentIndices != null,
-					"Indices for output type arguments within lambda not provided");
-				// check for lambda type erasure
-				validateLambdaGenericParameters(exec);
 
 				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
 				final int baseParametersLen = sam.getParameterTypes().length;
@@ -704,32 +685,17 @@ public class TypeExtractor {
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
 				final int paramLen = exec.getParameterTypes().length;
 
-				final Type input1 = TypeExtractionUtils.extractTypeFromLambda(
-					exec,
-					lambdaInput1TypeArgumentIndices,
-					paramLen,
-					baseParametersLen);
-				final Type input2 = TypeExtractionUtils.extractTypeFromLambda(
-					exec,
-					lambdaInput2TypeArgumentIndices,
-					paramLen,
-					baseParametersLen);
-
-				validateInputType(input1, in1Type);
-				validateInputType(input2, in2Type);
-				if(function instanceof ResultTypeQueryable) {
-					return ((ResultTypeQueryable<OUT>) function).getProducedType();
-				}
-
 				final Type output;
 				if (lambdaOutputTypeArgumentIndices.length > 0) {
 					output = TypeExtractionUtils.extractTypeFromLambda(
+						baseClass,
 						exec,
 						lambdaOutputTypeArgumentIndices,
 						paramLen,
 						baseParametersLen);
 				} else {
 					output = exec.getReturnType();
+					TypeExtractionUtils.validateLambdaType(baseClass, output);
 				}
 
 				return new TypeExtractor().privateCreateTypeInfo(
@@ -738,13 +704,11 @@ public class TypeExtractor {
 					in2Type);
 			}
 			else {
-				Preconditions.checkArgument(input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided");
-				Preconditions.checkArgument(input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided");
-				Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
-				validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type);
-				validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type);
-				if(function instanceof ResultTypeQueryable) {
-					return ((ResultTypeQueryable<OUT>) function).getProducedType();
+				if (in1Type != null) {
+					validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type);
+				}
+				if (in2Type != null) {
+					validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type);
 				}
 				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, in1Type, in2Type);
 			}
@@ -915,9 +879,10 @@ public class TypeExtractor {
 					return typeInfo;
 				} else {
 					throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) t).getName() + "' in '"
-							+ ((TypeVariable<?>) t).getGenericDeclaration() + "' could not be determined. This is most likely a type erasure problem. "
-							+ "The type extraction currently supports types with generic variables only in cases where "
-							+ "all variables in the return type can be deduced from the input type(s).");
+						+ ((TypeVariable<?>) t).getGenericDeclaration() + "' could not be determined. This is most likely a type erasure problem. "
+						+ "The type extraction currently supports types with generic variables only in cases where "
+						+ "all variables in the return type can be deduced from the input type(s). "
+						+ "Otherwise the type has to be specified explicitly using type information.");
 				}
 			}
 		}
@@ -1165,10 +1130,11 @@ public class TypeExtractor {
 				// variable could not be determined
 				if (subTypesInfo[i] == null && !lenient) {
 					throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
-							+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
-							+ "' could not be determined. This is most likely a type erasure problem. "
-							+ "The type extraction currently supports types with generic variables only in cases where "
-							+ "all variables in the return type can be deduced from the input type(s).");
+						+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
+						+ "' could not be determined. This is most likely a type erasure problem. "
+						+ "The type extraction currently supports types with generic variables only in cases where "
+						+ "all variables in the return type can be deduced from the input type(s). "
+						+ "Otherwise the type has to be specified explicitly using type information.");
 				}
 			} else {
 				// create the type information of the subtype or null/exception
@@ -1618,30 +1584,6 @@ public class TypeExtractor {
 		return fieldCount;
 	}
 
-	private static void validateLambdaGenericParameters(LambdaExecutable exec) {
-		// check the arguments
-		for (Type t : exec.getParameterTypes()) {
-			validateLambdaGenericParameter(t);
-		}
-
-		// check the return type
-		validateLambdaGenericParameter(exec.getReturnType());
-	}
-
-	private static void validateLambdaGenericParameter(Type t) {
-		if(!(t instanceof Class)) {
-			return;
-		}
-		final Class<?> clazz = (Class<?>) t;
-
-		if(clazz.getTypeParameters().length > 0) {
-			throw new InvalidTypesException("The generic type parameters of '" + clazz.getSimpleName() + "' are missing. \n"
-					+ "It seems that your compiler has not stored them into the .class file. \n"
-					+ "Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. \n"
-					+ "See the documentation for more information about how to compile jobs containing lambda expressions.");
-		}
-	}
-
 	/**
 	 * Tries to find a concrete value (Class, ParameterizedType etc. ) for a TypeVariable by traversing the type hierarchy downwards.
 	 * If a value could not be found it will return the most bottom type variable in the hierarchy.

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java
new file mode 100644
index 0000000..1d5cf22
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the type extractor for lambda functions. Many tests only work if the compiler supports
+ * lambdas properly otherwise a MissingTypeInfo is returned.
+ */
+public class LambdaExtractionTest {
+
+	private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> NESTED_TUPLE_BOOLEAN_TYPE =
+			new TypeHint<Tuple2<Tuple1<Integer>, Boolean>>(){}.getTypeInfo();
+
+	private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> NESTED_TUPLE_DOUBLE_TYPE =
+			new TypeHint<Tuple2<Tuple1<Integer>, Double>>(){}.getTypeInfo();
+
+	@Test
+	@SuppressWarnings({"Convert2Lambda", "Anonymous2MethodRef"})
+	public void testIdentifyLambdas() throws TypeExtractionException {
+		MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
+			@Override
+			public Integer map(String value) {
+				return Integer.parseInt(value);
+			}
+		};
+
+		MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
+			@Override
+			public Integer map(String value) {
+				return Integer.parseInt(value);
+			}
+		};
+
+		MapFunction<?, ?> fromProperClass = new StaticMapper();
+
+		MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
+			@Override
+			public Tuple2<Integer, Long> map(Integer value) {
+				return new Tuple2<>(value, 1L);
+			}
+		};
+
+		MapFunction<String, Integer> staticLambda = Integer::parseInt;
+		MapFunction<Integer, String> instanceLambda = Object::toString;
+		MapFunction<String, Integer> constructorLambda = Integer::new;
+
+		assertNull(checkAndExtractLambda(anonymousFromInterface));
+		assertNull(checkAndExtractLambda(anonymousFromClass));
+		assertNull(checkAndExtractLambda(fromProperClass));
+		assertNull(checkAndExtractLambda(fromDerived));
+		assertNotNull(checkAndExtractLambda(staticLambda));
+		assertNotNull(checkAndExtractLambda(instanceLambda));
+		assertNotNull(checkAndExtractLambda(constructorLambda));
+		assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
+	}
+
+	private static class StaticMapper implements MapFunction<String, Integer> {
+		@Override
+		public Integer map(String value) {
+			return Integer.parseInt(value);
+		}
+	}
+
+	private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+		@Override
+		Tuple2<T, Long> map(T value) throws Exception;
+	}
+
+	private static final MapFunction<String, Integer> STATIC_LAMBDA = Integer::parseInt;
+
+	private static class MyClass {
+		private String s = "mystring";
+
+		public MapFunction<Integer, String> getMapFunction() {
+			return (i) -> s;
+		}
+	}
+
+	@Test
+	public void testLambdaWithMemberVariable() {
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), Types.INT);
+		assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+	}
+
+	@Test
+	public void testLambdaWithLocalVariable() {
+		String s = "mystring";
+		final int k = 24;
+		int j = 26;
+
+		MapFunction<Integer, String> f = (i) -> s + k + j;
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, Types.INT);
+		assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+	}
+
+	@Test
+	public void testLambdaWithNonGenericResultType() {
+		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Boolean> f = (i) -> null;
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		assertTrue(ti instanceof BasicTypeInfo);
+		assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
+	}
+
+	@Test
+	public void testMapLambda() {
+		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testFlatMapLambda() {
+		FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, out) -> out.collect(null);
+
+		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testMapPartitionLambda() {
+		MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+
+		TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testJoinLambda() {
+		JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
+
+		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testCoGroupLambda() {
+		CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
+
+		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@Test
+	public void testKeySelectorLambda() {
+		KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+
+		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, null, true);
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testLambdaTypeErasure() {
+		MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
+		assertTrue(ti instanceof MissingTypeInfo);
+	}
+
+	@Test
+	public void testPartitionerLambda() {
+		Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
+		final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner, null, true);
+
+		if (!(ti instanceof MissingTypeInfo)) {
+			assertTrue(ti.isTupleType());
+			assertEquals(2, ti.getArity());
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
+			assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+		}
+	}
+
+	private static class MyType {
+		private int key;
+
+		public int getKey() {
+			return key;
+		}
+
+		public void setKey(int key) {
+			this.key = key;
+		}
+
+		protected int getKey2() {
+			return 0;
+		}
+	}
+
+	@Test
+	public void testInstanceMethodRefSameType() {
+		MapFunction<MyType, Integer> f = MyType::getKey;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class));
+		assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+	}
+
+	@Test
+	public void testInstanceMethodRefSuperType() {
+		MapFunction<Integer, String> f = Object::toString;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO);
+		assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
+	}
+
+	private static class MySubtype extends MyType {
+		public boolean test;
+	}
+
+	@Test
+	public void testInstanceMethodRefSuperTypeProtected() {
+		MapFunction<MySubtype, Integer> f = MyType::getKey2;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class));
+		assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+	}
+
+	@Test
+	public void testConstructorMethodRef() {
+		MapFunction<String, Integer> f = Integer::new;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO);
+		assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+	}
+
+	private interface InterfaceWithDefaultMethod {
+		void samMethod();
+
+		default void defaultMethod() {
+
+		}
+	}
+
+	@Test
+	public void testSamMethodExtractionInterfaceWithDefaultMethod() {
+		final Method sam = TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithDefaultMethod.class);
+		assertNotNull(sam);
+		assertEquals("samMethod", sam.getName());
+	}
+
+	private interface InterfaceWithMultipleMethods {
+		void firstMethod();
+
+		void secondMethod();
+	}
+
+	@Test(expected = InvalidTypesException.class)
+	public void getSingleAbstractMethodMultipleMethods() {
+		TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithMultipleMethods.class);
+	}
+
+	private interface InterfaceWithoutAbstractMethod {
+		default void defaultMethod() {
+
+		}
+	}
+
+	@Test(expected = InvalidTypesException.class)
+	public void testSingleAbstractMethodNoAbstractMethods() {
+		TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithoutAbstractMethod.class);
+	}
+
+	private abstract class AbstractClassWithSingleAbstractMethod {
+		public abstract void defaultMethod();
+	}
+
+	@Test(expected = InvalidTypesException.class)
+	public void testSingleAbstractMethodNotAnInterface() {
+		TypeExtractionUtils.getSingleAbstractMethod(AbstractClassWithSingleAbstractMethod.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index c585e82..0874999 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.examples.java.relational;
 
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -118,33 +117,18 @@ public class TPCHQuery10 {
 		// orders filtered by year: (orderkey, custkey)
 		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
 				// filter by year
-				orders.filter(
-								new FilterFunction<Tuple3<Integer, Integer, String>>() {
-									@Override
-									public boolean filter(Tuple3<Integer, Integer, String> o) {
-										return Integer.parseInt(o.f2.substring(0, 4)) > 1990;
-									}
-								})
+				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
 				// project fields out that are no longer required
 				.project(0, 1);
 
 		// lineitems filtered by flag: (orderkey, revenue)
 		DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag =
 				// filter by flag
-				lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
-										@Override
-										public boolean filter(Tuple4<Integer, Double, Double, String> l) {
-											return l.f3.equals("R");
-										}
-								})
+				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
 				// compute revenue and project out return flag
-				.map(new MapFunction<Tuple4<Integer, Double, Double, String>, Tuple2<Integer, Double>>() {
-							@Override
-							public Tuple2<Integer, Double> map(Tuple4<Integer, Double, Double, String> l) {
-								// revenue per item = l_extendedprice * (1 - l_discount)
-								return new Tuple2<Integer, Double>(l.f0, l.f1 * (1 - l.f2));
-							}
-					});
+				// revenue per item = l_extendedprice * (1 - l_discount)
+				.map(lineitem -> new Tuple2<>(lineitem.f0, lineitem.f1 * (1 - lineitem.f2)))
+				.returns(Types.TUPLE(Types.INT, Types.DOUBLE)); // for lambda with generics
 
 		// join orders with lineitems: (custkey, revenue)
 		DataSet<Tuple2<Integer, Double>> revenueByCustomer =

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index c494c6f..2882fc7 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -43,7 +43,6 @@ import org.apache.flink.util.Collector;
  * </ul>
  *
  */
-@SuppressWarnings("serial")
 public class WordCount {
 
 	// *************************************************************************
@@ -110,7 +109,7 @@ public class WordCount {
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
+					out.collect(new Tuple2<>(token, 1));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index 9c7c435..ad34d71 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -72,10 +72,10 @@ public class WordCount {
 		}
 
 		DataStream<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new Tokenizer())
-		// group by the tuple field "0" and sum up tuple field "1"
-				.keyBy(0).sum(1);
+			// split up the lines in pairs (2-tuples) containing: (word,1)
+			text.flatMap(new Tokenizer())
+			// group by the tuple field "0" and sum up tuple field "1"
+			.keyBy(0).sum(1);
 
 		// emit result
 		if (params.has("output")) {
@@ -100,18 +100,16 @@ public class WordCount {
 	 * Integer>}).
 	 */
 	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
 			// normalize and split the line
 			String[] tokens = value.toLowerCase().split("\\W+");
 
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
+					out.collect(new Tuple2<>(token, 1));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
deleted file mode 100644
index 62504f9..0000000
--- a/flink-java8/pom.xml
+++ /dev/null
@@ -1,225 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.7-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-java8_${scala.binary.version}</artifactId>
-	<name>flink-java8</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<!-- core dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<!-- test dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-cep_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<!-- just define the Java version to be used for compiling and plugins -->
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version><!--$NO-MVN-MAN-VER$-->
-				<configuration>
-					<source>1.8</source>
-					<target>1.8</target>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<systemPropertyVariables>
-						<log.level>WARN</log.level>
-					</systemPropertyVariables>
-				</configuration>
-			</plugin>
-
-			<!-- get default data from flink-examples-batch package -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-dependency-plugin</artifactId>
-				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
-				<executions>
-					<execution>
-						<id>unpack</id>
-						<phase>prepare-package</phase>
-						<goals>
-							<goal>unpack</goal>
-						</goals>
-						<configuration>
-							<artifactItems>
-								<artifactItem>
-									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
-									<version>${project.version}</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
-								</artifactItem>
-							</artifactItems>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	
-		<pluginManagement>
-			<plugins>
-				<plugin>
-					<!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
-					<artifactId>maven-compiler-plugin</artifactId>
-					<configuration>
-						<source>1.8</source>
-						<target>1.8</target>
-						<compilerId>jdt</compilerId>
-					</configuration>
-					<dependencies>
-						<!-- This dependency provides the implementation of compiler "jdt": -->
-						<dependency>
-							<groupId>org.eclipse.tycho</groupId>
-							<artifactId>tycho-compiler-jdt</artifactId>
-							<version>0.21.0</version>
-						</dependency>
-					</dependencies>
-				</plugin>
-				<plugin>
-					<!-- Skip the deployment of the Java8 artifact -->
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-deploy-plugin</artifactId>
-					<version>2.4</version><!--$NO-MVN-MAN-VER$-->
-					<configuration>
-						<skip>true</skip>
-					</configuration>
-				</plugin>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-assembly-plugin</artifactId>
-										<versionRange>[2.4,)</versionRange>
-										<goals>
-											<goal>single</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-compiler-plugin</artifactId>
-										<versionRange>[3.1,)</versionRange>
-										<goals>
-											<goal>testCompile</goal>
-											<goal>compile</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-dependency-plugin</artifactId>
-										<versionRange>[2.9,)</versionRange>
-										<goals>
-											<goal>unpack</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
deleted file mode 100644
index c0fce4d..0000000
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java8.relational;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-
-/**
- * This program implements a modified version of the TPC-H query 10.
- * The original query can be found at
- * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- *
- * <p>This program implements the following SQL equivalent:
- *
- * <p><pre>{@code
- * SELECT
- *        c_custkey,
- *        c_name,
- *        c_address,
- *        n_name,
- *        c_acctbal
- *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,
- * FROM
- *        customer,
- *        orders,
- *        lineitem,
- *        nation
- * WHERE
- *        c_custkey = o_custkey
- *        AND l_orderkey = o_orderkey
- *        AND YEAR(o_orderdate) > '1990'
- *        AND l_returnflag = 'R'
- *        AND c_nationkey = n_nationkey
- * GROUP BY
- *        c_custkey,
- *        c_name,
- *        c_acctbal,
- *        n_name,
- *        c_address
- * }</pre>
- *
- * <p>Compared to the original TPC-H query this version does not print
- * c_phone and c_comment, only filters by years greater than 1990 instead of
- * a period of 3 months, and does not sort the result by revenue.
- *
- * <p>Input files are plain text CSV files using the pipe character ('|') as field separator
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- * <p>Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
- *
- * <p>This example shows how to use:
- * <ul>
- * <li> inline-defined functions using Java 8 Lambda Expressions
- * </ul>
- */
-public class TPCHQuery10 {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get customer data set: (custkey, name, address, nationkey, acctbal)
-		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
-
-		// get orders data set: (orderkey, custkey, orderdate)
-		DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
-
-		// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
-		DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
-
-		// get nation data set: (nationkey, name)
-		DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
-
-		// orders filtered by year: (orderkey, custkey)
-		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
-				// filter by year
-				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
-				// project fields out that are no longer required
-				.project(0, 1);
-
-		// lineitems filtered by flag: (orderkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
-				// filter by flag
-				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
-				// project fields out that are no longer required
-				.project(0, 1, 2);
-
-		// join orders with lineitems: (custkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
-				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
-									.where(0).equalTo(0)
-									.projectFirst(1).projectSecond(1, 2);
-
-		// aggregate for revenue: (custkey, revenue)
-		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
-				// calculate the revenue for each item
-				// revenue per item = l_extendedprice * (1 - l_discount)
-				.map(i -> new Tuple2<>(i.f0, i.f1 * (1 - i.f2)))
-				// aggregate the revenues per item to revenue per customer
-				.groupBy(0).sum(1);
-
-		// join customer with nation (custkey, name, address, nationname, acctbal)
-		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
-						.joinWithTiny(nations)
-						.where(3).equalTo(0)
-						.projectFirst(0, 1, 2).projectSecond(1).projectFirst(4);
-
-		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
-		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
-				customerWithNation.join(revenueOfCustomerKey)
-				.where(0).equalTo(0)
-				.projectFirst(0, 1, 2, 3, 4).projectSecond(1);
-
-		// emit result
-		customerWithRevenue.writeAsCsv(outputPath);
-
-		// execute program
-		env.execute("TPCH Query 10 Example");
-
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static String customerPath;
-	private static String ordersPath;
-	private static String lineitemPath;
-	private static String nationPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] programArguments) {
-
-		if (programArguments.length > 0) {
-			if (programArguments.length == 5) {
-				customerPath = programArguments[0];
-				ordersPath = programArguments[1];
-				lineitemPath = programArguments[2];
-				nationPath = programArguments[3];
-				outputPath = programArguments[4];
-			} else {
-				System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
-				return false;
-			}
-		} else {
-			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-								"  Due to legal restrictions, we can not ship generated data.\n" +
-								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
-								"  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
-			return false;
-		}
-		return true;
-	}
-
-	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(customerPath)
-					.fieldDelimiter("|")
-					.includeFields("11110100")
-					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
-	}
-
-	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(ordersPath)
-					.fieldDelimiter("|")
-					.includeFields("110010000")
-					.types(Integer.class, Integer.class, String.class);
-	}
-
-	private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(lineitemPath)
-					.fieldDelimiter("|")
-					.includeFields("1000011010000000")
-					.types(Integer.class, Double.class, Double.class, String.class);
-	}
-
-	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(nationPath)
-					.fieldDelimiter("|")
-					.includeFields("1100")
-					.types(Integer.class, String.class);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
deleted file mode 100644
index 8f36f66..0000000
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java8.wordcount;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.util.Collector;
-
-import java.util.Arrays;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram
- * over text files.
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>write a compact Flink program with Java 8 Lambda Expressions.
- * </ul>
- *
- */
-public class WordCount {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataSet<String> text = getTextDataSet(env);
-
-		DataSet<Tuple2<String, Integer>> counts =
-				// normalize and split each line
-				text.map(line -> line.toLowerCase().split("\\W+"))
-				// convert split line in pairs (2-tuples) containing: (word,1)
-				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
-					// emit the pairs with non-zero-length words
-					Arrays.stream(tokens)
-					.filter(t -> t.length() > 0)
-					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
-				})
-				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.sum(1);
-
-		// emit result
-		if (fileOutput) {
-			counts.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount Example");
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return WordCountData.getDefaultTextLineDataSet(env);
-		}
-	}
-}


[2/3] flink git commit: [FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction

Posted by tw...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
deleted file mode 100644
index b9dba77..0000000
--- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.java8.wordcount;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-import java.util.Arrays;
-
-/**
- * Implements the streaming "WordCount" program that computes a simple word occurrences
- * over text files.
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>write a compact Flink Streaming program with Java 8 Lambda Expressions.
- * </ul>
- *
- */
-public class WordCount {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataStream<String> text = getTextDataStream(env);
-
-		DataStream<Tuple2<String, Integer>> counts =
-				// normalize and split each line
-				text.map(line -> line.toLowerCase().split("\\W+"))
-				// convert split line in pairs (2-tuples) containing: (word,1)
-				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
-					// emit the pairs with non-zero-length words
-					Arrays.stream(tokens)
-					.filter(t -> t.length() > 0)
-					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
-				})
-				// group by the tuple field "0" and sum up tuple field "1"
-				.keyBy(0)
-				.sum(1);
-
-		// emit result
-		if (fileOutput) {
-			counts.writeAsCsv(outputPath);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("Streaming WordCount Example");
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
deleted file mode 100644
index de1f395..0000000
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.type.lambdas;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.lang.reflect.Method;
-
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests the type extractor for lambda functions.
- */
-@SuppressWarnings("serial")
-public class LambdaExtractionTest {
-
-	private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> NESTED_TUPLE_BOOLEAN_TYPE =
-			new TypeHint<Tuple2<Tuple1<Integer>, Boolean>>(){}.getTypeInfo();
-
-	private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> NESTED_TUPLE_DOUBLE_TYPE =
-			new TypeHint<Tuple2<Tuple1<Integer>, Double>>(){}.getTypeInfo();
-
-	@Test
-	public void testIdentifyLambdas() {
-		try {
-			MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
-				@Override
-				public Integer map(String value) {
-					return Integer.parseInt(value);
-				}
-			};
-
-			MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
-				@Override
-				public Integer map(String value) {
-					return Integer.parseInt(value);
-				}
-			};
-
-			MapFunction<?, ?> fromProperClass = new StaticMapper();
-
-			MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
-				@Override
-				public Tuple2<Integer, Long> map(Integer value) {
-					return new Tuple2<>(value, 1L);
-				}
-			};
-
-			MapFunction<String, Integer> staticLambda = Integer::parseInt;
-			MapFunction<Integer, String> instanceLambda = Object::toString;
-			MapFunction<String, Integer> constructorLambda = Integer::new;
-
-			assertNull(checkAndExtractLambda(anonymousFromInterface));
-			assertNull(checkAndExtractLambda(anonymousFromClass));
-			assertNull(checkAndExtractLambda(fromProperClass));
-			assertNull(checkAndExtractLambda(fromDerived));
-			assertNotNull(checkAndExtractLambda(staticLambda));
-			assertNotNull(checkAndExtractLambda(instanceLambda));
-			assertNotNull(checkAndExtractLambda(constructorLambda));
-			assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private static class StaticMapper implements MapFunction<String, Integer> {
-		@Override
-		public Integer map(String value) {
-			return Integer.parseInt(value);
-		}
-	}
-
-	private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
-		@Override
-		Tuple2<T, Long> map(T value) throws Exception;
-	}
-
-	private static final MapFunction<String, Integer> STATIC_LAMBDA = Integer::parseInt;
-
-	private static class MyClass {
-		private String s = "mystring";
-
-		public MapFunction<Integer, String> getMapFunction() {
-			return (i) -> s;
-		}
-	}
-
-	@Test
-	public void testLambdaWithMemberVariable() {
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), Types.INT);
-		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
-	}
-
-	@Test
-	public void testLambdaWithLocalVariable() {
-		String s = "mystring";
-		final int k = 24;
-		int j = 26;
-
-		MapFunction<Integer, String> f = (i) -> s + k + j;
-
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, Types.INT);
-		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
-	}
-
-	@Test
-	public void testMapLambda() {
-		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testFlatMapLambda() {
-		FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testMapPartitionLambda() {
-		MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testGroupReduceLambda() {
-		GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testFlatJoinLambda() {
-		FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testJoinLambda() {
-		JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
-
-		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testCoGroupLambda() {
-		CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testCrossLambda() {
-		CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
-
-		TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testKeySelectorLambda() {
-		KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-
-		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testLambdaTypeErasure() {
-		MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
-		Assert.assertTrue(ti instanceof MissingTypeInfo);
-	}
-
-	@Test
-	public void testPartitionerLambda() {
-		Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
-		final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner);
-
-		Assert.assertTrue(ti.isTupleType());
-		Assert.assertEquals(2, ti.getArity());
-		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
-		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-
-	}
-
-	private static class MyType {
-		private int key;
-
-		public int getKey() {
-			return key;
-		}
-
-		public void setKey(int key) {
-			this.key = key;
-		}
-
-		protected int getKey2() {
-			return 0;
-		}
-	}
-
-	@Test
-	public void testInstanceMethodRefSameType() {
-		MapFunction<MyType, Integer> f = MyType::getKey;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class));
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-	}
-
-	@Test
-	public void testInstanceMethodRefSuperType() {
-		MapFunction<Integer, String> f = Object::toString;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO);
-		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
-	}
-
-	private static class MySubtype extends MyType {
-		public boolean test;
-	}
-
-	@Test
-	public void testInstanceMethodRefSuperTypeProtected() {
-		MapFunction<MySubtype, Integer> f = MyType::getKey2;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class));
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-	}
-
-	@Test
-	public void testConstructorMethodRef() {
-		MapFunction<String, Integer> f = Integer::new;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO);
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-	}
-
-	private interface InterfaceWithDefaultMethod {
-		void samMethod();
-
-		default void defaultMethod() {
-
-		}
-	}
-
-	@Test
-	public void testSamMethodExtractionInterfaceWithDefaultMethod() {
-		final Method sam = TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithDefaultMethod.class);
-		assertNotNull(sam);
-		assertEquals("samMethod", sam.getName());
-	}
-
-	private interface InterfaceWithMultipleMethods {
-		void firstMethod();
-
-		void secondMethod();
-	}
-
-	@Test(expected = InvalidTypesException.class)
-	public void getSingleAbstractMethodMultipleMethods() throws Exception {
-		TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithMultipleMethods.class);
-	}
-
-	private interface InterfaceWithoutAbstractMethod {
-		default void defaultMethod() {
-
-		}
-	}
-
-	@Test(expected = InvalidTypesException.class)
-	public void getSingleAbstractMethodNoAbstractMethods() throws Exception {
-		TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithoutAbstractMethod.class);
-	}
-
-	private abstract class AbstractClassWithSingleAbstractMethod {
-		public abstract void defaultMethod();
-	}
-
-	@Test(expected = InvalidTypesException.class)
-	public void getSingleAbstractMethodNotAnInterface() throws Exception {
-		TypeExtractionUtils.getSingleAbstractMethod(AbstractClassWithSingleAbstractMethod.class);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
deleted file mode 100644
index 7cbdf6a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for lambda support in CEP.
- */
-public class CEPLambdaTest extends TestLogger {
-	/**
-	 * Test event class.
-	 */
-	public static class EventA {}
-
-	/**
-	 * Test event class.
-	 */
-	public static class EventB {}
-
-	/**
-	 * Tests that a Java8 lambda can be passed as a CEP select function.
-	 */
-	@Test
-	public void testLambdaSelectFunction() {
-		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
-		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
-
-		DataStream<EventA> inputStream = new DataStream<>(
-			StreamExecutionEnvironment.getExecutionEnvironment(),
-			new SourceTransformation<>(
-				"source",
-				null,
-				eventTypeInformation,
-				1));
-
-		Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
-
-		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
-
-		DataStream<EventB> result = patternStream.select(
-				(Map<String, List<EventA>> map) -> new EventB()
-		);
-
-		assertEquals(outputTypeInformation, result.getType());
-	}
-
-	/**
-	 * Tests that a Java8 lambda can be passed as a CEP flat select function.
-	 */
-	@Test
-	public void testLambdaFlatSelectFunction() {
-		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
-		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
-
-		DataStream<EventA> inputStream = new DataStream<>(
-			StreamExecutionEnvironment.getExecutionEnvironment(),
-			new SourceTransformation<>(
-				"source",
-				null,
-				eventTypeInformation,
-				1));
-
-		Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
-
-		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
-
-		DataStream<EventB> result = patternStream.flatSelect(
-			(Map<String, List<EventA>> map, Collector<EventB> collector) -> collector.collect(new EventB())
-		);
-
-		assertEquals(outputTypeInformation, result.getType());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
deleted file mode 100644
index ca11275..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda1;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda2;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda3;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda4;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.jar.JarInputStream;
-import java.util.zip.ZipEntry;
-
-/**
- * Tests for the {@link JarFileCreator}.
- */
-public class JarFileCreatorLambdaTest {
-	@Test
-	public void testFilterFunctionOnLambda1() throws Exception {
-		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
-		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(FilterLambda1.class)
-			.createJarFile();
-
-		Set<String> ans = new HashSet<String>();
-		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda1.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-
-		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
-		out.delete();
-	}
-
-	@Test
-	public void testFilterFunctionOnLambda2() throws Exception{
-		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
-		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(FilterLambda2.class)
-			.createJarFile();
-
-		Set<String> ans = new HashSet<String>();
-		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda2.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-
-		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
-		out.delete();
-	}
-
-	@Test
-	public void testFilterFunctionOnLambda3() throws Exception {
-		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
-		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(FilterLambda3.class)
-			.createJarFile();
-
-		Set<String> ans = new HashSet<String>();
-		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda3.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunction.class");
-
-		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
-		out.delete();
-	}
-
-	@Test
-	public void testFilterFunctionOnLambda4() throws Exception {
-		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
-		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(FilterLambda4.class)
-			.createJarFile();
-
-		Set<String> ans = new HashSet<String>();
-		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda4.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class");
-
-		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
-		out.delete();
-	}
-
-	public boolean validate(Set<String> expected, File out) throws Exception {
-		int count = expected.size();
-		try (JarInputStream jis = new JarInputStream(new FileInputStream(out))) {
-			ZipEntry ze;
-			while ((ze = jis.getNextEntry()) != null) {
-				count--;
-				expected.remove(ze.getName());
-			}
-		}
-		return count == 0 && expected.size() == 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
deleted file mode 100644
index 12abff9..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * A lambda filter using a static method.
- */
-public class FilterLambda1 {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
-		FilterFunction<String> filter = (v) -> WordFilter.filter(v);
-
-		DataSet<String> output = input.filter(filter);
-		output.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
deleted file mode 100644
index 9555607..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda1}, but the filter lambda is directly passed to {@link DataSet#filter(FilterFunction)}.
- */
-public class FilterLambda2 {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
-		DataSet<String> output = input.filter((v) -> WordFilter.filter(v));
-		output.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
deleted file mode 100644
index b493722..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda2}, but uses a getter to retrieve a lambda filter instance.
- */
-public class FilterLambda3 {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
-		DataSet<String> output = input.filter(UtilFunction.getWordFilter());
-		output.print();
-
-		env.execute();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
deleted file mode 100644
index 606ef5e..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda3} with additional indirection.
- */
-public class FilterLambda4 {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
-		DataSet<String> output = input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter());
-		output.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
deleted file mode 100644
index 1d5394a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * Static factory for a lambda filter function.
- */
-public class UtilFunction {
-	public static FilterFunction<String> getWordFilter() {
-		return (v) -> WordFilter.filter(v);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
deleted file mode 100644
index de8f68a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * A wrapper around {@link WordFilter} to introduce additional indirection.
- */
-public class UtilFunctionWrapper {
-	/**
-	 * Static factory for a lambda filter function.
-	 */
-	public static class UtilFunction {
-		public static FilterFunction<String> getWordFilter() {
-			return (v) -> WordFilter.filter(v);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
deleted file mode 100644
index 4a5b16f..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-/**
- * Static filter method for lambda tests.
- */
-public class WordFilter {
-	public static boolean filter(String value) {
-		return !value.contains("not");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
deleted file mode 100644
index cee34af..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda allreduce functions.
- */
-public class AllGroupReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "aaabacad\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-		DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
-			String conc = "";
-			for (String s : values) {
-				conc = conc.concat(s);
-			}
-			out.collect(conc);
-		});
-		concatDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
deleted file mode 100644
index a70f37a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cogroup functions.
- */
-public class CoGroupITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "6\n3\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-			new Tuple2<Integer, String>(1, "hello"),
-			new Tuple2<Integer, String>(2, "what's"),
-			new Tuple2<Integer, String>(2, "up")
-		);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-			new Tuple2<Integer, String>(1, "not"),
-			new Tuple2<Integer, String>(1, "much"),
-			new Tuple2<Integer, String>(2, "really")
-		);
-		DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
-			.with((values1, values2, out) -> {
-				int sum = 0;
-				for (Tuple2<Integer, String> next : values1) {
-					sum += next.f0;
-				}
-				for (Tuple2<Integer, String> next : values2) {
-					sum += next.f0;
-				}
-				out.collect(sum);
-			});
-		joined.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
deleted file mode 100644
index 32cd910..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cross functions.
- */
-public class CrossITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,hello not\n" +
-			"3,what's not\n" +
-			"3,up not\n" +
-			"2,hello much\n" +
-			"3,what's much\n" +
-			"3,up much\n" +
-			"3,hello really\n" +
-			"4,what's really\n" +
-			"4,up really";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.cross(right)
-				.with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
deleted file mode 100644
index 6ad1058..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda filter functions.
- */
-public class FilterITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n";
-
-	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
-		data.add(new Tuple3<>(1, 1L, "Hi"));
-		data.add(new Tuple3<>(2, 2L, "Hello"));
-		data.add(new Tuple3<>(3, 2L, "Hello world"));
-		data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
-		data.add(new Tuple3<>(5, 3L, "I am fine."));
-		data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
-		data.add(new Tuple3<>(7, 4L, "Comment#1"));
-		data.add(new Tuple3<>(8, 4L, "Comment#2"));
-		data.add(new Tuple3<>(9, 4L, "Comment#3"));
-		data.add(new Tuple3<>(10, 4L, "Comment#4"));
-		data.add(new Tuple3<>(11, 5L, "Comment#5"));
-		data.add(new Tuple3<>(12, 5L, "Comment#6"));
-		data.add(new Tuple3<>(13, 5L, "Comment#7"));
-		data.add(new Tuple3<>(14, 5L, "Comment#8"));
-		data.add(new Tuple3<>(15, 5L, "Comment#9"));
-		data.add(new Tuple3<>(16, 6L, "Comment#10"));
-		data.add(new Tuple3<>(17, 6L, "Comment#11"));
-		data.add(new Tuple3<>(18, 6L, "Comment#12"));
-		data.add(new Tuple3<>(19, 6L, "Comment#13"));
-		data.add(new Tuple3<>(20, 6L, "Comment#14"));
-		data.add(new Tuple3<>(21, 6L, "Comment#15"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(value -> value.f2.contains("world"));
-		filterDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
deleted file mode 100644
index f793450..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class FlatJoinITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,what's really\n" +
-			"2,up really\n" +
-			"1,hello not\n" +
-			"1,hello much\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
-				.with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
deleted file mode 100644
index d395d7d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda flatmap functions.
- */
-public class FlatMapITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "bb\n" +
-			"bb\n" +
-			"bc\n" +
-			"bd\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-		DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
-		flatMappedDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
deleted file mode 100644
index 53db541..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda groupreduce functions.
- */
-public class GroupReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "abad\n" +
-			"aaac\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
-				new Tuple2<>(1, "aa"),
-				new Tuple2<>(2, "ab"),
-				new Tuple2<>(1, "ac"),
-				new Tuple2<>(2, "ad")
-				);
-		DataSet<String> concatDs = stringDs
-				.groupBy(0)
-				.reduceGroup((values, out) -> {
-					String conc = "";
-					for (Tuple2<Integer, String> next : values) {
-						conc = conc.concat(next.f1);
-					}
-					out.collect(conc);
-				});
-		concatDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
deleted file mode 100644
index d86ea49..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class JoinITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,what's really\n" +
-			"2,up really\n" +
-			"1,hello not\n" +
-			"1,hello much\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
-				.with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
deleted file mode 100644
index 15a9b9d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda map functions.
- */
-public class MapITCase extends JavaProgramTestBase {
-
-	private static class Trade {
-
-		public String v;
-
-		public Trade(String v) {
-			this.v = v;
-		}
-
-		@Override
-		public String toString() {
-			return v;
-		}
-	}
-
-	private static final String EXPECTED_RESULT = "22\n" +
-			"22\n" +
-			"23\n" +
-			"24\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
-		DataSet<String> mappedDs = stringDs
-			.map(Object::toString)
-			.map (s -> s.replace("1", "2"))
-			.map(Trade::new)
-			.map(Trade::toString);
-		mappedDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
deleted file mode 100644
index 712132c..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda reduce functions.
- */
-public class ReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
-			"2,3,2,Hallo Welt wie,1\n" +
-			"2,2,1,Hallo Welt,2\n" +
-			"3,9,0,P-),2\n" +
-			"3,6,5,BCD,3\n" +
-			"4,17,0,P-),1\n" +
-			"4,17,0,P-),2\n" +
-			"5,11,10,GHI,1\n" +
-			"5,29,0,P-),2\n" +
-			"5,25,0,P-),3\n";
-
-	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
-		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
-		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
-		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
-		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
-		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
-		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
-		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
-		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
-		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
-		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
-		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
-				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
-				.groupBy(4, 0)
-				.reduce((in1, in2) -> {
-					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
-					return out;
-				});
-
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/resources/log4j-test.properties b/flink-java8/src/test/resources/log4j-test.properties
deleted file mode 100644
index c977d4c..0000000
--- a/flink-java8/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 0e6c2fe..521665f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -104,8 +104,7 @@ public class PatternStream<T> {
 			PatternSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
-			new int[]{},
+			TypeExtractor.NO_INDEX,
 			inputStream.getType(),
 			null,
 			false);
@@ -173,8 +172,7 @@ public class PatternStream<T> {
 			PatternSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
-			new int[]{},
+			TypeExtractor.NO_INDEX,
 			inputStream.getType(),
 			null,
 			false);
@@ -259,8 +257,7 @@ public class PatternStream<T> {
 			PatternSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
-			new int[]{},
+			TypeExtractor.NO_INDEX,
 			inputStream.getType(),
 			null,
 			false);
@@ -270,8 +267,7 @@ public class PatternStream<T> {
 			PatternTimeoutFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
-			new int[]{},
+			TypeExtractor.NO_INDEX,
 			inputStream.getType(),
 			null,
 			false);
@@ -314,7 +310,6 @@ public class PatternStream<T> {
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[] {0, 1, 0},
 			new int[] {1, 0},
 			inputStream.getType(),
 			null,
@@ -381,7 +376,6 @@ public class PatternStream<T> {
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
 			new int[]{1, 0},
 			inputStream.getType(),
 			null,
@@ -465,7 +459,6 @@ public class PatternStream<T> {
 			PatternFlatTimeoutFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
 			new int[]{2, 0},
 			inputStream.getType(),
 			null,
@@ -476,7 +469,6 @@ public class PatternStream<T> {
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
 			new int[]{1, 0},
 			inputStream.getType(),
 			null,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 6d1013c..e397d31 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
@@ -96,19 +97,15 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		});
 
-		DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
+		DataStream<String> result = CEP.pattern(input, pattern).flatSelect((p, o) -> {
+			StringBuilder builder = new StringBuilder();
 
-			@Override
-			public String select(Map<String, List<Event>> pattern) {
-				StringBuilder builder = new StringBuilder();
+			builder.append(p.get("start").get(0).getId()).append(",")
+				.append(p.get("middle").get(0).getId()).append(",")
+				.append(p.get("end").get(0).getId());
 
-				builder.append(pattern.get("start").get(0).getId()).append(",")
-					.append(pattern.get("middle").get(0).getId()).append(",")
-					.append(pattern.get("end").get(0).getId());
-
-				return builder.toString();
-			}
-		});
+			o.collect(builder.toString());
+		}, Types.STRING);
 
 		List<String> resultList = new ArrayList<>();
 
@@ -170,18 +167,14 @@ public class CEPITCase extends AbstractTestBase {
 				}
 			});
 
-		DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
-
-			@Override
-			public String select(Map<String, List<Event>> pattern) {
-				StringBuilder builder = new StringBuilder();
+		DataStream<String> result = CEP.pattern(input, pattern).select(p -> {
+			StringBuilder builder = new StringBuilder();
 
-				builder.append(pattern.get("start").get(0).getId()).append(",")
-					.append(pattern.get("middle").get(0).getId()).append(",")
-					.append(pattern.get("end").get(0).getId());
+			builder.append(p.get("start").get(0).getId()).append(",")
+				.append(p.get("middle").get(0).getId()).append(",")
+				.append(p.get("end").get(0).getId());
 
-				return builder.toString();
-			}
+			return builder.toString();
 		});
 
 		List<String> resultList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 6dcf766..95310b4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -78,7 +78,6 @@ public class Translate {
 			TranslateFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1},
 			oldType,
 			null,
@@ -162,7 +161,6 @@ public class Translate {
 			TranslateFunction.class,
 			0,
 			1,
-			new int[] {0},
 			new int[] {1},
 			oldType,
 			null,
@@ -248,7 +246,6 @@ public class Translate {
 			TranslateFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1},
 			oldType,
 			null,
@@ -332,7 +329,6 @@ public class Translate {
 			TranslateFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1},
 			oldType,
 			null,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 0ca6eb9..33399f8 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -156,25 +156,6 @@ under the License.
 		<pluginManagement>
 			<plugins>
 
-				<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
-				<!--
-				<plugin>
-					<artifactId>maven-compiler-plugin</artifactId>
-					<configuration>
-						<source>${java.version}</source>
-						<target>${java.version}</target>
-						<compilerId>jdt</compilerId>
-					</configuration>
-					<dependencies>
-						<dependency>
-							<groupId>org.eclipse.tycho</groupId>
-							<artifactId>tycho-compiler-jdt</artifactId>
-							<version>0.21.0</version>
-						</dependency>
-					</dependencies>
-				</plugin>
-				-->
-
 				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
 				<plugin>
 					<groupId>org.eclipse.m2e</groupId>