You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/20 06:07:21 UTC

[2/3] flink git commit: [FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
deleted file mode 100644
index b9dba77..0000000
--- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.java8.wordcount;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-import java.util.Arrays;
-
-/**
- * Implements the streaming "WordCount" program that computes a simple word occurrences
- * over text files.
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>write a compact Flink Streaming program with Java 8 Lambda Expressions.
- * </ul>
- *
- */
-public class WordCount {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataStream<String> text = getTextDataStream(env);
-
-		DataStream<Tuple2<String, Integer>> counts =
-				// normalize and split each line
-				text.map(line -> line.toLowerCase().split("\\W+"))
-				// convert split line in pairs (2-tuples) containing: (word,1)
-				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
-					// emit the pairs with non-zero-length words
-					Arrays.stream(tokens)
-					.filter(t -> t.length() > 0)
-					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
-				})
-				// group by the tuple field "0" and sum up tuple field "1"
-				.keyBy(0)
-				.sum(1);
-
-		// emit result
-		if (fileOutput) {
-			counts.writeAsCsv(outputPath);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("Streaming WordCount Example");
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
deleted file mode 100644
index de1f395..0000000
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.type.lambdas;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.lang.reflect.Method;
-
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests the type extractor for lambda functions.
- */
-@SuppressWarnings("serial")
-public class LambdaExtractionTest {
-
-	private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> NESTED_TUPLE_BOOLEAN_TYPE =
-			new TypeHint<Tuple2<Tuple1<Integer>, Boolean>>(){}.getTypeInfo();
-
-	private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> NESTED_TUPLE_DOUBLE_TYPE =
-			new TypeHint<Tuple2<Tuple1<Integer>, Double>>(){}.getTypeInfo();
-
-	@Test
-	public void testIdentifyLambdas() {
-		try {
-			MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
-				@Override
-				public Integer map(String value) {
-					return Integer.parseInt(value);
-				}
-			};
-
-			MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
-				@Override
-				public Integer map(String value) {
-					return Integer.parseInt(value);
-				}
-			};
-
-			MapFunction<?, ?> fromProperClass = new StaticMapper();
-
-			MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
-				@Override
-				public Tuple2<Integer, Long> map(Integer value) {
-					return new Tuple2<>(value, 1L);
-				}
-			};
-
-			MapFunction<String, Integer> staticLambda = Integer::parseInt;
-			MapFunction<Integer, String> instanceLambda = Object::toString;
-			MapFunction<String, Integer> constructorLambda = Integer::new;
-
-			assertNull(checkAndExtractLambda(anonymousFromInterface));
-			assertNull(checkAndExtractLambda(anonymousFromClass));
-			assertNull(checkAndExtractLambda(fromProperClass));
-			assertNull(checkAndExtractLambda(fromDerived));
-			assertNotNull(checkAndExtractLambda(staticLambda));
-			assertNotNull(checkAndExtractLambda(instanceLambda));
-			assertNotNull(checkAndExtractLambda(constructorLambda));
-			assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private static class StaticMapper implements MapFunction<String, Integer> {
-		@Override
-		public Integer map(String value) {
-			return Integer.parseInt(value);
-		}
-	}
-
-	private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
-		@Override
-		Tuple2<T, Long> map(T value) throws Exception;
-	}
-
-	private static final MapFunction<String, Integer> STATIC_LAMBDA = Integer::parseInt;
-
-	private static class MyClass {
-		private String s = "mystring";
-
-		public MapFunction<Integer, String> getMapFunction() {
-			return (i) -> s;
-		}
-	}
-
-	@Test
-	public void testLambdaWithMemberVariable() {
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), Types.INT);
-		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
-	}
-
-	@Test
-	public void testLambdaWithLocalVariable() {
-		String s = "mystring";
-		final int k = 24;
-		int j = 26;
-
-		MapFunction<Integer, String> f = (i) -> s + k + j;
-
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, Types.INT);
-		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
-	}
-
-	@Test
-	public void testMapLambda() {
-		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testFlatMapLambda() {
-		FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testMapPartitionLambda() {
-		MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testGroupReduceLambda() {
-		GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testFlatJoinLambda() {
-		FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testJoinLambda() {
-		JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
-
-		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testCoGroupLambda() {
-		CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
-
-		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testCrossLambda() {
-		CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
-
-		TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@Test
-	public void testKeySelectorLambda() {
-		KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-
-		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
-		if (!(ti instanceof MissingTypeInfo)) {
-			Assert.assertTrue(ti.isTupleType());
-			Assert.assertEquals(2, ti.getArity());
-			Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
-			Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-		}
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testLambdaTypeErasure() {
-		MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
-		Assert.assertTrue(ti instanceof MissingTypeInfo);
-	}
-
-	@Test
-	public void testPartitionerLambda() {
-		Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
-		final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner);
-
-		Assert.assertTrue(ti.isTupleType());
-		Assert.assertEquals(2, ti.getArity());
-		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
-		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
-
-	}
-
-	private static class MyType {
-		private int key;
-
-		public int getKey() {
-			return key;
-		}
-
-		public void setKey(int key) {
-			this.key = key;
-		}
-
-		protected int getKey2() {
-			return 0;
-		}
-	}
-
-	@Test
-	public void testInstanceMethodRefSameType() {
-		MapFunction<MyType, Integer> f = MyType::getKey;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class));
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-	}
-
-	@Test
-	public void testInstanceMethodRefSuperType() {
-		MapFunction<Integer, String> f = Object::toString;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO);
-		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
-	}
-
-	private static class MySubtype extends MyType {
-		public boolean test;
-	}
-
-	@Test
-	public void testInstanceMethodRefSuperTypeProtected() {
-		MapFunction<MySubtype, Integer> f = MyType::getKey2;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class));
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-	}
-
-	@Test
-	public void testConstructorMethodRef() {
-		MapFunction<String, Integer> f = Integer::new;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO);
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
-	}
-
-	private interface InterfaceWithDefaultMethod {
-		void samMethod();
-
-		default void defaultMethod() {
-
-		}
-	}
-
-	@Test
-	public void testSamMethodExtractionInterfaceWithDefaultMethod() {
-		final Method sam = TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithDefaultMethod.class);
-		assertNotNull(sam);
-		assertEquals("samMethod", sam.getName());
-	}
-
-	private interface InterfaceWithMultipleMethods {
-		void firstMethod();
-
-		void secondMethod();
-	}
-
-	@Test(expected = InvalidTypesException.class)
-	public void getSingleAbstractMethodMultipleMethods() throws Exception {
-		TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithMultipleMethods.class);
-	}
-
-	private interface InterfaceWithoutAbstractMethod {
-		default void defaultMethod() {
-
-		}
-	}
-
-	@Test(expected = InvalidTypesException.class)
-	public void getSingleAbstractMethodNoAbstractMethods() throws Exception {
-		TypeExtractionUtils.getSingleAbstractMethod(InterfaceWithoutAbstractMethod.class);
-	}
-
-	private abstract class AbstractClassWithSingleAbstractMethod {
-		public abstract void defaultMethod();
-	}
-
-	@Test(expected = InvalidTypesException.class)
-	public void getSingleAbstractMethodNotAnInterface() throws Exception {
-		TypeExtractionUtils.getSingleAbstractMethod(AbstractClassWithSingleAbstractMethod.class);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
deleted file mode 100644
index 7cbdf6a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for lambda support in CEP.
- */
-public class CEPLambdaTest extends TestLogger {
-	/**
-	 * Test event class.
-	 */
-	public static class EventA {}
-
-	/**
-	 * Test event class.
-	 */
-	public static class EventB {}
-
-	/**
-	 * Tests that a Java8 lambda can be passed as a CEP select function.
-	 */
-	@Test
-	public void testLambdaSelectFunction() {
-		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
-		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
-
-		DataStream<EventA> inputStream = new DataStream<>(
-			StreamExecutionEnvironment.getExecutionEnvironment(),
-			new SourceTransformation<>(
-				"source",
-				null,
-				eventTypeInformation,
-				1));
-
-		Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
-
-		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
-
-		DataStream<EventB> result = patternStream.select(
-				(Map<String, List<EventA>> map) -> new EventB()
-		);
-
-		assertEquals(outputTypeInformation, result.getType());
-	}
-
-	/**
-	 * Tests that a Java8 lambda can be passed as a CEP flat select function.
-	 */
-	@Test
-	public void testLambdaFlatSelectFunction() {
-		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
-		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
-
-		DataStream<EventA> inputStream = new DataStream<>(
-			StreamExecutionEnvironment.getExecutionEnvironment(),
-			new SourceTransformation<>(
-				"source",
-				null,
-				eventTypeInformation,
-				1));
-
-		Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
-
-		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
-
-		DataStream<EventB> result = patternStream.flatSelect(
-			(Map<String, List<EventA>> map, Collector<EventB> collector) -> collector.collect(new EventB())
-		);
-
-		assertEquals(outputTypeInformation, result.getType());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
deleted file mode 100644
index ca11275..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda1;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda2;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda3;
-import org.apache.flink.runtime.util.jartestprogram.FilterLambda4;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.jar.JarInputStream;
-import java.util.zip.ZipEntry;
-
-/**
- * Tests for the {@link JarFileCreator}.
- */
-public class JarFileCreatorLambdaTest {
-	@Test
-	public void testFilterFunctionOnLambda1() throws Exception {
-		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
-		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(FilterLambda1.class)
-			.createJarFile();
-
-		Set<String> ans = new HashSet<String>();
-		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda1.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-
-		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
-		out.delete();
-	}
-
-	@Test
-	public void testFilterFunctionOnLambda2() throws Exception{
-		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
-		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(FilterLambda2.class)
-			.createJarFile();
-
-		Set<String> ans = new HashSet<String>();
-		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda2.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-
-		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
-		out.delete();
-	}
-
-	@Test
-	public void testFilterFunctionOnLambda3() throws Exception {
-		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
-		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(FilterLambda3.class)
-			.createJarFile();
-
-		Set<String> ans = new HashSet<String>();
-		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda3.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunction.class");
-
-		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
-		out.delete();
-	}
-
-	@Test
-	public void testFilterFunctionOnLambda4() throws Exception {
-		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
-		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(FilterLambda4.class)
-			.createJarFile();
-
-		Set<String> ans = new HashSet<String>();
-		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda4.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
-		ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class");
-
-		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
-		out.delete();
-	}
-
-	public boolean validate(Set<String> expected, File out) throws Exception {
-		int count = expected.size();
-		try (JarInputStream jis = new JarInputStream(new FileInputStream(out))) {
-			ZipEntry ze;
-			while ((ze = jis.getNextEntry()) != null) {
-				count--;
-				expected.remove(ze.getName());
-			}
-		}
-		return count == 0 && expected.size() == 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
deleted file mode 100644
index 12abff9..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * A lambda filter using a static method.
- */
-public class FilterLambda1 {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
-		FilterFunction<String> filter = (v) -> WordFilter.filter(v);
-
-		DataSet<String> output = input.filter(filter);
-		output.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
deleted file mode 100644
index 9555607..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda1}, but the filter lambda is directly passed to {@link DataSet#filter(FilterFunction)}.
- */
-public class FilterLambda2 {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
-		DataSet<String> output = input.filter((v) -> WordFilter.filter(v));
-		output.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
deleted file mode 100644
index b493722..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda2}, but uses a getter to retrieve a lambda filter instance.
- */
-public class FilterLambda3 {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
-		DataSet<String> output = input.filter(UtilFunction.getWordFilter());
-		output.print();
-
-		env.execute();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
deleted file mode 100644
index 606ef5e..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Similar to {@link FilterLambda3} with additional indirection.
- */
-public class FilterLambda4 {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
-
-		DataSet<String> output = input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter());
-		output.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
deleted file mode 100644
index 1d5394a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * Static factory for a lambda filter function.
- */
-public class UtilFunction {
-	public static FilterFunction<String> getWordFilter() {
-		return (v) -> WordFilter.filter(v);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
deleted file mode 100644
index de8f68a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * A wrapper around {@link WordFilter} to introduce additional indirection.
- */
-public class UtilFunctionWrapper {
-	/**
-	 * Static factory for a lambda filter function.
-	 */
-	public static class UtilFunction {
-		public static FilterFunction<String> getWordFilter() {
-			return (v) -> WordFilter.filter(v);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
deleted file mode 100644
index 4a5b16f..0000000
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util.jartestprogram;
-
-/**
- * Static filter method for lambda tests.
- */
-public class WordFilter {
-	public static boolean filter(String value) {
-		return !value.contains("not");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
deleted file mode 100644
index cee34af..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda allreduce functions.
- */
-public class AllGroupReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "aaabacad\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-		DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
-			String conc = "";
-			for (String s : values) {
-				conc = conc.concat(s);
-			}
-			out.collect(conc);
-		});
-		concatDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
deleted file mode 100644
index a70f37a..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cogroup functions.
- */
-public class CoGroupITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "6\n3\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-			new Tuple2<Integer, String>(1, "hello"),
-			new Tuple2<Integer, String>(2, "what's"),
-			new Tuple2<Integer, String>(2, "up")
-		);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-			new Tuple2<Integer, String>(1, "not"),
-			new Tuple2<Integer, String>(1, "much"),
-			new Tuple2<Integer, String>(2, "really")
-		);
-		DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
-			.with((values1, values2, out) -> {
-				int sum = 0;
-				for (Tuple2<Integer, String> next : values1) {
-					sum += next.f0;
-				}
-				for (Tuple2<Integer, String> next : values2) {
-					sum += next.f0;
-				}
-				out.collect(sum);
-			});
-		joined.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
deleted file mode 100644
index 32cd910..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cross functions.
- */
-public class CrossITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,hello not\n" +
-			"3,what's not\n" +
-			"3,up not\n" +
-			"2,hello much\n" +
-			"3,what's much\n" +
-			"3,up much\n" +
-			"3,hello really\n" +
-			"4,what's really\n" +
-			"4,up really";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.cross(right)
-				.with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
deleted file mode 100644
index 6ad1058..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda filter functions.
- */
-public class FilterITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n";
-
-	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
-		data.add(new Tuple3<>(1, 1L, "Hi"));
-		data.add(new Tuple3<>(2, 2L, "Hello"));
-		data.add(new Tuple3<>(3, 2L, "Hello world"));
-		data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
-		data.add(new Tuple3<>(5, 3L, "I am fine."));
-		data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
-		data.add(new Tuple3<>(7, 4L, "Comment#1"));
-		data.add(new Tuple3<>(8, 4L, "Comment#2"));
-		data.add(new Tuple3<>(9, 4L, "Comment#3"));
-		data.add(new Tuple3<>(10, 4L, "Comment#4"));
-		data.add(new Tuple3<>(11, 5L, "Comment#5"));
-		data.add(new Tuple3<>(12, 5L, "Comment#6"));
-		data.add(new Tuple3<>(13, 5L, "Comment#7"));
-		data.add(new Tuple3<>(14, 5L, "Comment#8"));
-		data.add(new Tuple3<>(15, 5L, "Comment#9"));
-		data.add(new Tuple3<>(16, 6L, "Comment#10"));
-		data.add(new Tuple3<>(17, 6L, "Comment#11"));
-		data.add(new Tuple3<>(18, 6L, "Comment#12"));
-		data.add(new Tuple3<>(19, 6L, "Comment#13"));
-		data.add(new Tuple3<>(20, 6L, "Comment#14"));
-		data.add(new Tuple3<>(21, 6L, "Comment#15"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(value -> value.f2.contains("world"));
-		filterDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
deleted file mode 100644
index f793450..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class FlatJoinITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,what's really\n" +
-			"2,up really\n" +
-			"1,hello not\n" +
-			"1,hello much\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
-				.with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
deleted file mode 100644
index d395d7d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda flatmap functions.
- */
-public class FlatMapITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "bb\n" +
-			"bb\n" +
-			"bc\n" +
-			"bd\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-		DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
-		flatMappedDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
deleted file mode 100644
index 53db541..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda groupreduce functions.
- */
-public class GroupReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "abad\n" +
-			"aaac\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
-				new Tuple2<>(1, "aa"),
-				new Tuple2<>(2, "ab"),
-				new Tuple2<>(1, "ac"),
-				new Tuple2<>(2, "ad")
-				);
-		DataSet<String> concatDs = stringDs
-				.groupBy(0)
-				.reduceGroup((values, out) -> {
-					String conc = "";
-					for (Tuple2<Integer, String> next : values) {
-						conc = conc.concat(next.f1);
-					}
-					out.collect(conc);
-				});
-		concatDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
deleted file mode 100644
index d86ea49..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class JoinITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,what's really\n" +
-			"2,up really\n" +
-			"1,hello not\n" +
-			"1,hello much\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
-				.with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
deleted file mode 100644
index 15a9b9d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda map functions.
- */
-public class MapITCase extends JavaProgramTestBase {
-
-	private static class Trade {
-
-		public String v;
-
-		public Trade(String v) {
-			this.v = v;
-		}
-
-		@Override
-		public String toString() {
-			return v;
-		}
-	}
-
-	private static final String EXPECTED_RESULT = "22\n" +
-			"22\n" +
-			"23\n" +
-			"24\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
-		DataSet<String> mappedDs = stringDs
-			.map(Object::toString)
-			.map (s -> s.replace("1", "2"))
-			.map(Trade::new)
-			.map(Trade::toString);
-		mappedDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
deleted file mode 100644
index 712132c..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.api.java.operators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda reduce functions.
- */
-public class ReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
-			"2,3,2,Hallo Welt wie,1\n" +
-			"2,2,1,Hallo Welt,2\n" +
-			"3,9,0,P-),2\n" +
-			"3,6,5,BCD,3\n" +
-			"4,17,0,P-),1\n" +
-			"4,17,0,P-),2\n" +
-			"5,11,10,GHI,1\n" +
-			"5,29,0,P-),2\n" +
-			"5,25,0,P-),3\n";
-
-	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
-		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
-		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
-		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
-		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
-		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
-		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
-		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
-		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
-		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
-		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
-		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
-				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
-				.groupBy(4, 0)
-				.reduce((in1, in2) -> {
-					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
-					return out;
-				});
-
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-java8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/resources/log4j-test.properties b/flink-java8/src/test/resources/log4j-test.properties
deleted file mode 100644
index c977d4c..0000000
--- a/flink-java8/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 0e6c2fe..521665f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -104,8 +104,7 @@ public class PatternStream<T> {
 			PatternSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
-			new int[]{},
+			TypeExtractor.NO_INDEX,
 			inputStream.getType(),
 			null,
 			false);
@@ -173,8 +172,7 @@ public class PatternStream<T> {
 			PatternSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
-			new int[]{},
+			TypeExtractor.NO_INDEX,
 			inputStream.getType(),
 			null,
 			false);
@@ -259,8 +257,7 @@ public class PatternStream<T> {
 			PatternSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
-			new int[]{},
+			TypeExtractor.NO_INDEX,
 			inputStream.getType(),
 			null,
 			false);
@@ -270,8 +267,7 @@ public class PatternStream<T> {
 			PatternTimeoutFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
-			new int[]{},
+			TypeExtractor.NO_INDEX,
 			inputStream.getType(),
 			null,
 			false);
@@ -314,7 +310,6 @@ public class PatternStream<T> {
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[] {0, 1, 0},
 			new int[] {1, 0},
 			inputStream.getType(),
 			null,
@@ -381,7 +376,6 @@ public class PatternStream<T> {
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
 			new int[]{1, 0},
 			inputStream.getType(),
 			null,
@@ -465,7 +459,6 @@ public class PatternStream<T> {
 			PatternFlatTimeoutFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
 			new int[]{2, 0},
 			inputStream.getType(),
 			null,
@@ -476,7 +469,6 @@ public class PatternStream<T> {
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[]{0, 1, 0},
 			new int[]{1, 0},
 			inputStream.getType(),
 			null,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 6d1013c..e397d31 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
@@ -96,19 +97,15 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		});
 
-		DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
+		DataStream<String> result = CEP.pattern(input, pattern).flatSelect((p, o) -> {
+			StringBuilder builder = new StringBuilder();
 
-			@Override
-			public String select(Map<String, List<Event>> pattern) {
-				StringBuilder builder = new StringBuilder();
+			builder.append(p.get("start").get(0).getId()).append(",")
+				.append(p.get("middle").get(0).getId()).append(",")
+				.append(p.get("end").get(0).getId());
 
-				builder.append(pattern.get("start").get(0).getId()).append(",")
-					.append(pattern.get("middle").get(0).getId()).append(",")
-					.append(pattern.get("end").get(0).getId());
-
-				return builder.toString();
-			}
-		});
+			o.collect(builder.toString());
+		}, Types.STRING);
 
 		List<String> resultList = new ArrayList<>();
 
@@ -170,18 +167,14 @@ public class CEPITCase extends AbstractTestBase {
 				}
 			});
 
-		DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
-
-			@Override
-			public String select(Map<String, List<Event>> pattern) {
-				StringBuilder builder = new StringBuilder();
+		DataStream<String> result = CEP.pattern(input, pattern).select(p -> {
+			StringBuilder builder = new StringBuilder();
 
-				builder.append(pattern.get("start").get(0).getId()).append(",")
-					.append(pattern.get("middle").get(0).getId()).append(",")
-					.append(pattern.get("end").get(0).getId());
+			builder.append(p.get("start").get(0).getId()).append(",")
+				.append(p.get("middle").get(0).getId()).append(",")
+				.append(p.get("end").get(0).getId());
 
-				return builder.toString();
-			}
+			return builder.toString();
 		});
 
 		List<String> resultList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 6dcf766..95310b4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -78,7 +78,6 @@ public class Translate {
 			TranslateFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1},
 			oldType,
 			null,
@@ -162,7 +161,6 @@ public class Translate {
 			TranslateFunction.class,
 			0,
 			1,
-			new int[] {0},
 			new int[] {1},
 			oldType,
 			null,
@@ -248,7 +246,6 @@ public class Translate {
 			TranslateFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1},
 			oldType,
 			null,
@@ -332,7 +329,6 @@ public class Translate {
 			TranslateFunction.class,
 			0,
 			1,
-			new int[]{0},
 			new int[]{1},
 			oldType,
 			null,

http://git-wip-us.apache.org/repos/asf/flink/blob/ddba1b69/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 0ca6eb9..33399f8 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -156,25 +156,6 @@ under the License.
 		<pluginManagement>
 			<plugins>
 
-				<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
-				<!--
-				<plugin>
-					<artifactId>maven-compiler-plugin</artifactId>
-					<configuration>
-						<source>${java.version}</source>
-						<target>${java.version}</target>
-						<compilerId>jdt</compilerId>
-					</configuration>
-					<dependencies>
-						<dependency>
-							<groupId>org.eclipse.tycho</groupId>
-							<artifactId>tycho-compiler-jdt</artifactId>
-							<version>0.21.0</version>
-						</dependency>
-					</dependencies>
-				</plugin>
-				-->
-
 				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
 				<plugin>
 					<groupId>org.eclipse.m2e</groupId>