You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/27 19:18:42 UTC

[1/2] flink git commit: [FLINK-6720] Activate strict checkstyle in flink-java8

Repository: flink
Updated Branches:
  refs/heads/master 77b0fb9fe -> 7355a59f4


[FLINK-6720] Activate strict checkstyle in flink-java8


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7085de4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7085de4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7085de4

Branch: refs/heads/master
Commit: b7085de440e0e29c010d242846a74d3fd923cde7
Parents: 77b0fb9
Author: zentol <ch...@apache.org>
Authored: Fri May 26 08:45:36 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat May 27 19:24:23 2017 +0200

----------------------------------------------------------------------
 flink-java8/pom.xml                             |  34 +++++
 .../examples/java8/relational/TPCHQuery10.java  | 124 +++++++++----------
 .../examples/java8/wordcount/WordCount.java     |  59 +++++----
 .../examples/java8/wordcount/WordCount.java     |  57 ++++-----
 .../java/type/lambdas/LambdaExtractionTest.java |  40 +++---
 .../org/apache/flink/cep/CEPLambdaTest.java     |  16 ++-
 .../runtime/util/JarFileCreatorLambdaTest.java  |  12 +-
 .../util/jartestprogram/FilterLambda1.java      |   4 +-
 .../util/jartestprogram/FilterLambda2.java      |   4 +
 .../util/jartestprogram/FilterLambda3.java      |   3 +
 .../util/jartestprogram/FilterLambda4.java      |   3 +
 .../util/jartestprogram/UtilFunction.java       |   3 +
 .../jartestprogram/UtilFunctionWrapper.java     |   6 +
 .../runtime/util/jartestprogram/WordFilter.java |   3 +
 .../lambdas/AllGroupReduceITCase.java           |   3 +
 .../javaApiOperators/lambdas/CoGroupITCase.java |  41 +++---
 .../javaApiOperators/lambdas/CrossITCase.java   |   7 +-
 .../javaApiOperators/lambdas/FilterITCase.java  |  53 ++++----
 .../lambdas/FlatJoinITCase.java                 |   9 +-
 .../javaApiOperators/lambdas/FlatMapITCase.java |   3 +
 .../lambdas/GroupReduceITCase.java              |  17 +--
 .../javaApiOperators/lambdas/JoinITCase.java    |   9 +-
 .../javaApiOperators/lambdas/MapITCase.java     |   5 +-
 .../javaApiOperators/lambdas/ReduceITCase.java  |  47 +++----
 24 files changed, 328 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index fb7da02..bfc9cdd 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -230,6 +230,40 @@ under the License.
 						</lifecycleMappingMetadata>
 					</configuration>
 				</plugin>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-checkstyle-plugin</artifactId>
+					<version>2.17</version>
+					<dependencies>
+						<dependency>
+							<groupId>com.puppycrawl.tools</groupId>
+							<artifactId>checkstyle</artifactId>
+							<version>6.19</version>
+						</dependency>
+					</dependencies>
+					<configuration>
+						<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+						<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+						<includeTestSourceDirectory>true</includeTestSourceDirectory>
+						<logViolationsToConsole>true</logViolationsToConsole>
+						<failOnViolation>true</failOnViolation>
+					</configuration>
+					<executions>
+						<!--
+                        Execute checkstyle after compilation but before tests.
+    
+                        This ensures that any parsing or type checking errors are from
+                        javac, so they look as expected. Beyond that, we want to
+                        fail as early as possible.
+                        -->
+						<execution>
+							<phase>test-compile</phase>
+							<goals>
+								<goal>check</goal>
+							</goals>
+						</execution>
+					</executions>
+				</plugin>
 			</plugins>
 		</pluginManagement>
 	</build>

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
index 1ad4f41..c0fce4d 100644
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
+++ b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
@@ -30,71 +30,65 @@ import org.apache.flink.api.java.tuple.Tuple6;
  * This program implements a modified version of the TPC-H query 10.
  * The original query can be found at
  * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- * 
- * <p>
- * This program implements the following SQL equivalent:
- * 
- * <p>
- * <pre>{@code
- * SELECT 
+ *
+ * <p>This program implements the following SQL equivalent:
+ *
+ * <p><pre>{@code
+ * SELECT
  *        c_custkey,
- *        c_name, 
+ *        c_name,
  *        c_address,
- *        n_name, 
+ *        n_name,
  *        c_acctbal
- *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
- * FROM   
- *        customer, 
- *        orders, 
- *        lineitem, 
- *        nation 
- * WHERE 
- *        c_custkey = o_custkey 
- *        AND l_orderkey = o_orderkey 
- *        AND YEAR(o_orderdate) > '1990' 
- *        AND l_returnflag = 'R' 
- *        AND c_nationkey = n_nationkey 
- * GROUP BY 
- *        c_custkey, 
- *        c_name, 
- *        c_acctbal, 
- *        n_name, 
+ *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,
+ * FROM
+ *        customer,
+ *        orders,
+ *        lineitem,
+ *        nation
+ * WHERE
+ *        c_custkey = o_custkey
+ *        AND l_orderkey = o_orderkey
+ *        AND YEAR(o_orderdate) > '1990'
+ *        AND l_returnflag = 'R'
+ *        AND c_nationkey = n_nationkey
+ * GROUP BY
+ *        c_custkey,
+ *        c_name,
+ *        c_acctbal,
+ *        n_name,
  *        c_address
  * }</pre>
- *        
- * <p>
- * Compared to the original TPC-H query this version does not print 
+ *
+ * <p>Compared to the original TPC-H query this version does not print
  * c_phone and c_comment, only filters by years greater than 1990 instead of
  * a period of 3 months, and does not sort the result by revenue.
- * 
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ *
+ * <p>Input files are plain text CSV files using the pipe character ('|') as field separator
  * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- * 
- * <p>
- * Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
- *  
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li> inline-defined functions using Java 8 Lambda Expressions
  * </ul>
  */
 public class TPCHQuery10 {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
+
+		if (!parseParameters(args)) {
 			return;
 		}
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		// get customer data set: (custkey, name, address, nationkey, acctbal) 
+		// get customer data set: (custkey, name, address, nationkey, acctbal)
 		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
 
 		// get orders data set: (orderkey, custkey, orderdate)
@@ -111,20 +105,20 @@ public class TPCHQuery10 {
 				// filter by year
 				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
 				// project fields out that are no longer required
-				.project(0,1);
+				.project(0, 1);
 
 		// lineitems filtered by flag: (orderkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = 
+		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
 				// filter by flag
 				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
 				// project fields out that are no longer required
-				.project(0,1,2);
+				.project(0, 1, 2);
 
 		// join orders with lineitems: (custkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = 
+		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
 				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
 									.where(0).equalTo(0)
-									.projectFirst(1).projectSecond(1,2);
+									.projectFirst(1).projectSecond(1, 2);
 
 		// aggregate for revenue: (custkey, revenue)
 		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
@@ -138,36 +132,36 @@ public class TPCHQuery10 {
 		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
 						.joinWithTiny(nations)
 						.where(3).equalTo(0)
-						.projectFirst(0,1,2).projectSecond(1).projectFirst(4);
+						.projectFirst(0, 1, 2).projectSecond(1).projectFirst(4);
 
 		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
-		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = 
+		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
 				customerWithNation.join(revenueOfCustomerKey)
 				.where(0).equalTo(0)
-				.projectFirst(0,1,2,3,4).projectSecond(1);
+				.projectFirst(0, 1, 2, 3, 4).projectSecond(1);
 
 		// emit result
 		customerWithRevenue.writeAsCsv(outputPath);
-		
+
 		// execute program
 		env.execute("TPCH Query 10 Example");
-		
+
 	}
-	
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
-	
+
 	private static String customerPath;
 	private static String ordersPath;
 	private static String lineitemPath;
 	private static String nationPath;
 	private static String outputPath;
-	
+
 	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			if(programArguments.length == 5) {
+
+		if (programArguments.length > 0) {
+			if (programArguments.length == 5) {
 				customerPath = programArguments[0];
 				ordersPath = programArguments[1];
 				lineitemPath = programArguments[2];
@@ -180,20 +174,20 @@ public class TPCHQuery10 {
 		} else {
 			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
 								"  Due to legal restrictions, we can not ship generated data.\n" +
-								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
+								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
 								"  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
 			return false;
 		}
 		return true;
 	}
-	
+
 	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
 		return env.readCsvFile(customerPath)
 					.fieldDelimiter("|")
 					.includeFields("11110100")
 					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
 	}
-	
+
 	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
 		return env.readCsvFile(ordersPath)
 					.fieldDelimiter("|")
@@ -207,12 +201,12 @@ public class TPCHQuery10 {
 					.includeFields("1000011010000000")
 					.types(Integer.class, Double.class, Double.class, String.class);
 	}
-	
+
 	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
 		return env.readCsvFile(nationPath)
 					.fieldDelimiter("|")
 					.includeFields("1100")
 					.types(Integer.class, String.class);
 	}
-			
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
index aead125..0130dec 100644
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
+++ b/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
@@ -18,51 +18,48 @@
 
 package org.apache.flink.examples.java8.wordcount;
 
-import java.util.Arrays;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
 
+import java.util.Arrays;
+
 /**
  * Implements the "WordCount" program that computes a simple word occurrence histogram
- * over text files. 
- * 
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * 
- * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * over text files.
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * 
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
  * <ul>
  * <li>write a compact Flink program with Java 8 Lambda Expressions.
  * </ul>
- * 
+ *
  */
 public class WordCount {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
+
+		if (!parseParameters(args)) {
 			return;
 		}
-		
+
 		// set up the execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// get input data
 		DataSet<String> text = getTextDataSet(env);
-		
-		DataSet<Tuple2<String, Integer>> counts = 
+
+		DataSet<Tuple2<String, Integer>> counts =
 				// normalize and split each line
 				text.map(line -> line.toLowerCase().split("\\W+"))
 				// convert splitted line in pairs (2-tuples) containing: (word,1)
@@ -77,30 +74,30 @@ public class WordCount {
 				.sum(1);
 
 		// emit result
-		if(fileOutput) {
+		if (fileOutput) {
 			counts.writeAsCsv(outputPath, "\n", " ");
 		} else {
 			counts.print();
 		}
-		
+
 		// execute program
 		env.execute("WordCount Example");
 	}
-	
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
-	
+
 	private static boolean fileOutput = false;
 	private static String textPath;
 	private static String outputPath;
-	
+
 	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
+
+		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
-			if(args.length == 2) {
+			if (args.length == 2) {
 				textPath = args[0];
 				outputPath = args[1];
 			} else {
@@ -114,9 +111,9 @@ public class WordCount {
 		}
 		return true;
 	}
-	
+
 	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
+		if (fileOutput) {
 			// read the text file from given input path
 			return env.readTextFile(textPath);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
index 80b0ce0..f991433 100644
--- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
+++ b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
@@ -18,51 +18,48 @@
 
 package org.apache.flink.streaming.examples.java8.wordcount;
 
-import java.util.Arrays;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
 
+import java.util.Arrays;
+
 /**
  * Implements the streaming "WordCount" program that computes a simple word occurrences
- * over text files. 
- * 
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * 
- * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * over text files.
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * 
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
  * <ul>
  * <li>write a compact Flink Streaming program with Java 8 Lambda Expressions.
  * </ul>
- * 
+ *
  */
 public class WordCount {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
+
+		if (!parseParameters(args)) {
 			return;
 		}
-		
+
 		// set up the execution environment
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// get input data
 		DataStream<String> text = getTextDataStream(env);
-		
-		DataStream<Tuple2<String, Integer>> counts = 
+
+		DataStream<Tuple2<String, Integer>> counts =
 				// normalize and split each line
 				text.map(line -> line.toLowerCase().split("\\W+"))
 				// convert splitted line in pairs (2-tuples) containing: (word,1)
@@ -77,30 +74,30 @@ public class WordCount {
 				.sum(1);
 
 		// emit result
-		if(fileOutput) {
+		if (fileOutput) {
 			counts.writeAsCsv(outputPath);
 		} else {
 			counts.print();
 		}
-		
+
 		// execute program
 		env.execute("Streaming WordCount Example");
 	}
-	
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
-	
+
 	private static boolean fileOutput = false;
 	private static String textPath;
 	private static String outputPath;
-	
+
 	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
+
+		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
-			if(args.length == 2) {
+			if (args.length == 2) {
 				textPath = args[0];
 				outputPath = args[1];
 			} else {
@@ -114,7 +111,7 @@ public class WordCount {
 		}
 		return true;
 	}
-	
+
 	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
 		if (fileOutput) {
 			// read the text file from given input path

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 0d7415a..64ff605 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.api.java.type.lambdas;
 
-import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import org.junit.Assert;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -44,8 +36,18 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+
+import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests the type extractor for lambda functions.
+ */
 @SuppressWarnings("serial")
 public class LambdaExtractionTest {
 
@@ -54,12 +56,16 @@ public class LambdaExtractionTest {
 		try {
 			MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
 				@Override
-				public Integer map(String value) { return Integer.parseInt(value); }
+				public Integer map(String value) {
+					return Integer.parseInt(value);
+				}
 			};
 
 			MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
 				@Override
-				public Integer map(String value) { return Integer.parseInt(value); }
+				public Integer map(String value) {
+					return Integer.parseInt(value);
+				}
 			};
 
 			MapFunction<?, ?> fromProperClass = new StaticMapper();
@@ -90,19 +96,21 @@ public class LambdaExtractionTest {
 		}
 	}
 
-	public static class StaticMapper implements MapFunction<String, Integer> {
+	private static class StaticMapper implements MapFunction<String, Integer> {
 		@Override
-		public Integer map(String value) { return Integer.parseInt(value); }
+		public Integer map(String value) {
+			return Integer.parseInt(value);
+		}
 	}
 
-	public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+	private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
 		@Override
 		Tuple2<T, Long> map(T value) throws Exception;
 	}
 
 	private static final MapFunction<String, Integer> STATIC_LAMBDA = Integer::parseInt;
 
-	public static class MyClass {
+	private static class MyClass {
 		private String s = "mystring";
 
 		public MapFunction<Integer, String> getMapFunction() {
@@ -253,7 +261,7 @@ public class LambdaExtractionTest {
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 	}
 
-	public static class MyType {
+	private static class MyType {
 		private int key;
 
 		public int getKey() {
@@ -283,7 +291,7 @@ public class LambdaExtractionTest {
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
 	}
 
-	public static class MySubtype extends MyType {
+	private static class MySubtype extends MyType {
 		public boolean test;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 03fb3c6..4eff037 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -33,15 +33,24 @@ import org.junit.Test;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for lambda support in CEP.
+ */
 public class CEPLambdaTest extends TestLogger {
+	/**
+	 * Test event class.
+	 */
 	public static class EventA {}
 
+	/**
+	 * Test event class.
+	 */
 	public static class EventB {}
 
 	/**
-	 * Tests that a Java8 lambda can be passed as a CEP select function
+	 * Tests that a Java8 lambda can be passed as a CEP select function.
 	 */
 	@Test
 	@Ignore
@@ -49,7 +58,6 @@ public class CEPLambdaTest extends TestLogger {
 		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
 		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
 
-
 		DataStream<EventA> inputStream = new DataStream<>(
 			StreamExecutionEnvironment.getExecutionEnvironment(),
 			new SourceTransformation<>(
@@ -70,7 +78,7 @@ public class CEPLambdaTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that a Java8 lambda can be passed as a CEP flat select function
+	 * Tests that a Java8 lambda can be passed as a CEP flat select function.
 	 */
 	@Test
 	@Ignore

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
index d90f096..ca11275 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.util;
 
-
 import org.apache.flink.runtime.util.jartestprogram.FilterLambda1;
 import org.apache.flink.runtime.util.jartestprogram.FilterLambda2;
 import org.apache.flink.runtime.util.jartestprogram.FilterLambda3;
@@ -34,9 +33,12 @@ import java.util.Set;
 import java.util.jar.JarInputStream;
 import java.util.zip.ZipEntry;
 
+/**
+ * Tests for the {@link JarFileCreator}.
+ */
 public class JarFileCreatorLambdaTest {
 	@Test
-	public void TestFilterFunctionOnLambda1() throws Exception {
+	public void testFilterFunctionOnLambda1() throws Exception {
 		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
 		jfc.addClass(FilterLambda1.class)
@@ -51,7 +53,7 @@ public class JarFileCreatorLambdaTest {
 	}
 
 	@Test
-	public void TestFilterFunctionOnLambda2() throws Exception{
+	public void testFilterFunctionOnLambda2() throws Exception{
 		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
 		jfc.addClass(FilterLambda2.class)
@@ -66,7 +68,7 @@ public class JarFileCreatorLambdaTest {
 	}
 
 	@Test
-	public void TestFilterFunctionOnLambda3() throws Exception {
+	public void testFilterFunctionOnLambda3() throws Exception {
 		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
 		jfc.addClass(FilterLambda3.class)
@@ -82,7 +84,7 @@ public class JarFileCreatorLambdaTest {
 	}
 
 	@Test
-	public void TestFilterFunctionOnLambda4() throws Exception {
+	public void testFilterFunctionOnLambda4() throws Exception {
 		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
 		jfc.addClass(FilterLambda4.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
index cd0c9e7..12abff9 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.util.jartestprogram;
 
-
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
+/**
+ * A lambda filter using a static method.
+ */
 public class FilterLambda1 {
 
 	public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
index 06c279d..9555607 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.runtime.util.jartestprogram;
 
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
+/**
+ * Similar to {@link FilterLambda1}, but the filter lambda is directly passed to {@link DataSet#filter(FilterFunction)}.
+ */
 public class FilterLambda2 {
 
 	public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
index 27fd2d9..b493722 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.util.jartestprogram;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
+/**
+ * Similar to {@link FilterLambda2}, but uses a getter to retrieve a lambda filter instance.
+ */
 public class FilterLambda3 {
 
 	public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
index e66adb0..606ef5e 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.util.jartestprogram;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
+/**
+ * Similar to {@link FilterLambda3} with additional indirection.
+ */
 public class FilterLambda4 {
 
 	public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
index e662015..1d5394a 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.util.jartestprogram;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 
+/**
+ * Static factory for a lambda filter function.
+ */
 public class UtilFunction {
 	public static FilterFunction<String> getWordFilter() {
 		return (v) -> WordFilter.filter(v);

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
index f97cdd8..de8f68a 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
@@ -20,7 +20,13 @@ package org.apache.flink.runtime.util.jartestprogram;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 
+/**
+ * A wrapper around {@link WordFilter} to introduce additional indirection.
+ */
 public class UtilFunctionWrapper {
+	/**
+	 * Static factory for a lambda filter function.
+	 */
 	public static class UtilFunction {
 		public static FilterFunction<String> getWordFilter() {
 			return (v) -> WordFilter.filter(v);

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
index c6f833a..4a5b16f 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.util.jartestprogram;
 
+/**
+ * Static filter method for lambda tests.
+ */
 public class WordFilter {
 	public static boolean filter(String value) {
 		return !value.contains("not");

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
index 1420483..7dada56 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * IT cases for lambda allreduce functions.
+ */
 public class AllGroupReduceITCase extends JavaProgramTestBase {
 
 	private static final String EXPECTED_RESULT = "aaabacad\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
index 667a786..30362cb 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * IT cases for lambda cogroup functions.
+ */
 public class CoGroupITCase extends JavaProgramTestBase {
 
 	private static final String EXPECTED_RESULT = "6\n3\n";
@@ -40,26 +43,26 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
+			new Tuple2<Integer, String>(1, "hello"),
+			new Tuple2<Integer, String>(2, "what's"),
+			new Tuple2<Integer, String>(2, "up")
+		);
 		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
+			new Tuple2<Integer, String>(1, "not"),
+			new Tuple2<Integer, String>(1, "much"),
+			new Tuple2<Integer, String>(2, "really")
+		);
 		DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
-				.with((values1, values2, out) -> {
-					int sum = 0;
-					for (Tuple2<Integer, String> next : values1) {
-						sum += next.f0;
-					}
-					for (Tuple2<Integer, String> next : values2) {
-						sum += next.f0;
-					}
-					out.collect(sum);
-				});
+			.with((values1, values2, out) -> {
+				int sum = 0;
+				for (Tuple2<Integer, String> next : values1) {
+					sum += next.f0;
+				}
+				for (Tuple2<Integer, String> next : values2) {
+					sum += next.f0;
+				}
+				out.collect(sum);
+			});
 		joined.writeAsText(resultPath);
 		env.execute();
 	}
@@ -68,4 +71,4 @@ public class CoGroupITCase extends JavaProgramTestBase {
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
index 60916c9..11a360e 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * IT cases for lambda cross functions.
+ */
 public class CrossITCase extends JavaProgramTestBase {
 
 	private static final String EXPECTED_RESULT = "2,hello not\n" +
@@ -57,8 +60,8 @@ public class CrossITCase extends JavaProgramTestBase {
 				new Tuple2<Integer, String>(1, "much"),
 				new Tuple2<Integer, String>(2, "really")
 				);
-		DataSet<Tuple2<Integer,String>> joined = left.cross(right)
-				.with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1));
+		DataSet<Tuple2<Integer, String>> joined = left.cross(right)
+				.with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
 		joined.writeAsCsv(resultPath);
 		env.execute();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
index d83db06..e6bef71 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.test.javaApiOperators.lambdas;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for lambda filter funtions.
+ */
 public class FilterITCase extends JavaProgramTestBase {
 
 	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
@@ -35,27 +38,27 @@ public class FilterITCase extends JavaProgramTestBase {
 	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
 
 		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
-		data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
-		data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
-		data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
-		data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
-		data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
-		data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
-		data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
-		data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
-		data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
-		data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
-		data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
-		data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
-		data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
-		data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
-		data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
-		data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
-		data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
-		data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
-		data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
-		data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
-		data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+		data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
+		data.add(new Tuple3<>(5, 3L, "I am fine."));
+		data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
+		data.add(new Tuple3<>(7, 4L, "Comment#1"));
+		data.add(new Tuple3<>(8, 4L, "Comment#2"));
+		data.add(new Tuple3<>(9, 4L, "Comment#3"));
+		data.add(new Tuple3<>(10, 4L, "Comment#4"));
+		data.add(new Tuple3<>(11, 5L, "Comment#5"));
+		data.add(new Tuple3<>(12, 5L, "Comment#6"));
+		data.add(new Tuple3<>(13, 5L, "Comment#7"));
+		data.add(new Tuple3<>(14, 5L, "Comment#8"));
+		data.add(new Tuple3<>(15, 5L, "Comment#9"));
+		data.add(new Tuple3<>(16, 6L, "Comment#10"));
+		data.add(new Tuple3<>(17, 6L, "Comment#11"));
+		data.add(new Tuple3<>(18, 6L, "Comment#12"));
+		data.add(new Tuple3<>(19, 6L, "Comment#13"));
+		data.add(new Tuple3<>(20, 6L, "Comment#14"));
+		data.add(new Tuple3<>(21, 6L, "Comment#15"));
 
 		Collections.shuffle(data);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
index 714c14c..e35278d 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * IT cases for lambda join functions.
+ */
 public class FlatJoinITCase extends JavaProgramTestBase {
 
 	private static final String EXPECTED_RESULT = "2,what's really\n" +
@@ -52,8 +55,8 @@ public class FlatJoinITCase extends JavaProgramTestBase {
 				new Tuple2<Integer, String>(1, "much"),
 				new Tuple2<Integer, String>(2, "really")
 				);
-		DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
-				.with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
+		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
+				.with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
 		joined.writeAsCsv(resultPath);
 		env.execute();
 	}
@@ -62,4 +65,4 @@ public class FlatJoinITCase extends JavaProgramTestBase {
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
index 2b0e344..b5211c8 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * IT cases for lambda flatmap functions.
+ */
 public class FlatMapITCase extends JavaProgramTestBase {
 
 	private static final String EXPECTED_RESULT = "bb\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
index 23300c8..61061b8 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * IT cases for lambda groupreduce functions.
+ */
 public class GroupReduceITCase extends JavaProgramTestBase {
 
 	private static final String EXPECTED_RESULT = "abad\n" +
@@ -40,17 +43,17 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 	protected void testProgram() throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Tuple2<Integer,String>> stringDs = env.fromElements(
-				new Tuple2<Integer,String>(1, "aa"),
-				new Tuple2<Integer,String>(2, "ab"),
-				new Tuple2<Integer,String>(1, "ac"),
-				new Tuple2<Integer,String>(2, "ad")
+		DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
+				new Tuple2<>(1, "aa"),
+				new Tuple2<>(2, "ab"),
+				new Tuple2<>(1, "ac"),
+				new Tuple2<>(2, "ad")
 				);
 		DataSet<String> concatDs = stringDs
 				.groupBy(0)
 				.reduceGroup((values, out) -> {
 					String conc = "";
-					for (Tuple2<Integer,String> next : values) {
+					for (Tuple2<Integer, String> next : values) {
 						conc = conc.concat(next.f1);
 					}
 					out.collect(conc);
@@ -63,4 +66,4 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 	protected void postSubmit() throws Exception {
 		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
index aef35ac..0e78212 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * IT cases for lambda join functions.
+ */
 public class JoinITCase extends JavaProgramTestBase {
-	
+
 	private static final String EXPECTED_RESULT = "2,what's really\n" +
 			"2,up really\n" +
 			"1,hello not\n" +
@@ -52,8 +55,8 @@ public class JoinITCase extends JavaProgramTestBase {
 				new Tuple2<Integer, String>(1, "much"),
 				new Tuple2<Integer, String>(2, "really")
 				);
-		DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
-				.with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1));
+		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
+				.with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
 		joined.writeAsCsv(resultPath);
 		env.execute();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
index 87c1fa5..355d38d 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -22,9 +22,12 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+/**
+ * IT cases for lambda map functions.
+ */
 public class MapITCase extends JavaProgramTestBase {
 
-	public static class Trade {
+	private static class Trade {
 
 		public String v;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
index 52c215f..24b39d7 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.test.javaApiOperators.lambdas;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,6 +25,13 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for lambda reduce functions.
+ */
 public class ReduceITCase extends JavaProgramTestBase {
 
 	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
@@ -41,25 +44,25 @@ public class ReduceITCase extends JavaProgramTestBase {
 			"5,11,10,GHI,1\n" +
 			"5,29,0,P-),2\n" +
 			"5,25,0,P-),3\n";
-	
+
 	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
 
 		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(1,1l,0,"Hallo",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,5l,4,"ABC",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,6l,5,"BCD",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,7l,6,"CDE",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,8l,7,"DEF",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,9l,8,"EFG",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,10l,9,"FGH",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,11l,10,"GHI",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,12l,11,"HIJ",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,13l,12,"IJK",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,14l,13,"JKL",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,15l,14,"KLM",2l));
+		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
 
 		Collections.shuffle(data);
 
@@ -74,9 +77,9 @@ public class ReduceITCase extends JavaProgramTestBase {
 
 		return env.fromCollection(data, type);
 	}
-	
+
 	private String resultPath;
-	
+
 	@Override
 	protected void preSubmit() throws Exception {
 		resultPath = getTempDirPath("result");


[2/2] flink git commit: [FLINK-6720] Rename java8 javaApiOperators package to api.java.operators

Posted by ch...@apache.org.
[FLINK-6720] Rename java8 javaApiOperators package to api.java.operators

This closes #3999.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7355a59f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7355a59f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7355a59f

Branch: refs/heads/master
Commit: 7355a59f48fcda834a256f7925e00c66312494ed
Parents: b7085de
Author: zentol <ch...@apache.org>
Authored: Fri May 26 09:22:44 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat May 27 19:24:26 2017 +0200

----------------------------------------------------------------------
 .../operators/lambdas/AllGroupReduceITCase.java |  59 ++++++++++
 .../java/operators/lambdas/CoGroupITCase.java   |  74 +++++++++++++
 .../api/java/operators/lambdas/CrossITCase.java |  73 +++++++++++++
 .../java/operators/lambdas/FilterITCase.java    |  91 ++++++++++++++++
 .../java/operators/lambdas/FlatJoinITCase.java  |  68 ++++++++++++
 .../java/operators/lambdas/FlatMapITCase.java   |  56 ++++++++++
 .../operators/lambdas/GroupReduceITCase.java    |  69 ++++++++++++
 .../api/java/operators/lambdas/JoinITCase.java  |  69 ++++++++++++
 .../api/java/operators/lambdas/MapITCase.java   |  74 +++++++++++++
 .../java/operators/lambdas/ReduceITCase.java    | 109 +++++++++++++++++++
 .../lambdas/AllGroupReduceITCase.java           |  59 ----------
 .../javaApiOperators/lambdas/CoGroupITCase.java |  74 -------------
 .../javaApiOperators/lambdas/CrossITCase.java   |  73 -------------
 .../javaApiOperators/lambdas/FilterITCase.java  |  91 ----------------
 .../lambdas/FlatJoinITCase.java                 |  68 ------------
 .../javaApiOperators/lambdas/FlatMapITCase.java |  56 ----------
 .../lambdas/GroupReduceITCase.java              |  69 ------------
 .../javaApiOperators/lambdas/JoinITCase.java    |  69 ------------
 .../javaApiOperators/lambdas/MapITCase.java     |  74 -------------
 .../javaApiOperators/lambdas/ReduceITCase.java  | 109 -------------------
 20 files changed, 742 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
new file mode 100644
index 0000000..cee34af
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda allreduce functions.
+ */
+public class AllGroupReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "aaabacad\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+		DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
+			String conc = "";
+			for (String s : values) {
+				conc = conc.concat(s);
+			}
+			out.collect(conc);
+		});
+		concatDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
new file mode 100644
index 0000000..a70f37a
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda cogroup functions.
+ */
+public class CoGroupITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "6\n3\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+			new Tuple2<Integer, String>(1, "hello"),
+			new Tuple2<Integer, String>(2, "what's"),
+			new Tuple2<Integer, String>(2, "up")
+		);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+			new Tuple2<Integer, String>(1, "not"),
+			new Tuple2<Integer, String>(1, "much"),
+			new Tuple2<Integer, String>(2, "really")
+		);
+		DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
+			.with((values1, values2, out) -> {
+				int sum = 0;
+				for (Tuple2<Integer, String> next : values1) {
+					sum += next.f0;
+				}
+				for (Tuple2<Integer, String> next : values2) {
+					sum += next.f0;
+				}
+				out.collect(sum);
+			});
+		joined.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
new file mode 100644
index 0000000..32cd910
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda cross functions.
+ */
+public class CrossITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "2,hello not\n" +
+			"3,what's not\n" +
+			"3,up not\n" +
+			"2,hello much\n" +
+			"3,what's much\n" +
+			"3,up much\n" +
+			"3,hello really\n" +
+			"4,what's really\n" +
+			"4,up really";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer, String>> joined = left.cross(right)
+				.with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
new file mode 100644
index 0000000..345b119
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for lambda filter funtions.
+ */
+public class FilterITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n";
+
+	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+		data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
+		data.add(new Tuple3<>(5, 3L, "I am fine."));
+		data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
+		data.add(new Tuple3<>(7, 4L, "Comment#1"));
+		data.add(new Tuple3<>(8, 4L, "Comment#2"));
+		data.add(new Tuple3<>(9, 4L, "Comment#3"));
+		data.add(new Tuple3<>(10, 4L, "Comment#4"));
+		data.add(new Tuple3<>(11, 5L, "Comment#5"));
+		data.add(new Tuple3<>(12, 5L, "Comment#6"));
+		data.add(new Tuple3<>(13, 5L, "Comment#7"));
+		data.add(new Tuple3<>(14, 5L, "Comment#8"));
+		data.add(new Tuple3<>(15, 5L, "Comment#9"));
+		data.add(new Tuple3<>(16, 6L, "Comment#10"));
+		data.add(new Tuple3<>(17, 6L, "Comment#11"));
+		data.add(new Tuple3<>(18, 6L, "Comment#12"));
+		data.add(new Tuple3<>(19, 6L, "Comment#13"));
+		data.add(new Tuple3<>(20, 6L, "Comment#14"));
+		data.add(new Tuple3<>(21, 6L, "Comment#15"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(value -> value.f2.contains("world"));
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
new file mode 100644
index 0000000..f793450
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda join functions.
+ */
+public class FlatJoinITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "2,what's really\n" +
+			"2,up really\n" +
+			"1,hello not\n" +
+			"1,hello much\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
+				.with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
new file mode 100644
index 0000000..d395d7d
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda flatmap functions.
+ */
+public class FlatMapITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "bb\n" +
+			"bb\n" +
+			"bc\n" +
+			"bd\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+		DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
+		flatMappedDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
new file mode 100644
index 0000000..53db541
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda groupreduce functions.
+ */
+public class GroupReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "abad\n" +
+			"aaac\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
+				new Tuple2<>(1, "aa"),
+				new Tuple2<>(2, "ab"),
+				new Tuple2<>(1, "ac"),
+				new Tuple2<>(2, "ad")
+				);
+		DataSet<String> concatDs = stringDs
+				.groupBy(0)
+				.reduceGroup((values, out) -> {
+					String conc = "";
+					for (Tuple2<Integer, String> next : values) {
+						conc = conc.concat(next.f1);
+					}
+					out.collect(conc);
+				});
+		concatDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
new file mode 100644
index 0000000..d86ea49
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda join functions.
+ */
+public class JoinITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "2,what's really\n" +
+			"2,up really\n" +
+			"1,hello not\n" +
+			"1,hello much\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
+				.with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
new file mode 100644
index 0000000..15a9b9d
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda map functions.
+ */
+public class MapITCase extends JavaProgramTestBase {
+
+	private static class Trade {
+
+		public String v;
+
+		public Trade(String v) {
+			this.v = v;
+		}
+
+		@Override
+		public String toString() {
+			return v;
+		}
+	}
+
+	private static final String EXPECTED_RESULT = "22\n" +
+			"22\n" +
+			"23\n" +
+			"24\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
+		DataSet<String> mappedDs = stringDs
+			.map(Object::toString)
+			.map (s -> s.replace("1", "2"))
+			.map(Trade::new)
+			.map(Trade::toString);
+		mappedDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
new file mode 100644
index 0000000..712132c
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for lambda reduce functions.
+ */
+public class ReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
+			"2,3,2,Hallo Welt wie,1\n" +
+			"2,2,1,Hallo Welt,2\n" +
+			"3,9,0,P-),2\n" +
+			"3,6,5,BCD,3\n" +
+			"4,17,0,P-),1\n" +
+			"4,17,0,P-),2\n" +
+			"5,11,10,GHI,1\n" +
+			"5,29,0,P-),2\n" +
+			"5,25,0,P-),3\n";
+
+	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
+		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
+				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+				.groupBy(4, 0)
+				.reduce((in1, in2) -> {
+					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+					return out;
+				});
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
deleted file mode 100644
index 7dada56..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda allreduce functions.
- */
-public class AllGroupReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "aaabacad\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-		DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
-			String conc = "";
-			for (String s : values) {
-				conc = conc.concat(s);
-			}
-			out.collect(conc);
-		});
-		concatDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
deleted file mode 100644
index 30362cb..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cogroup functions.
- */
-public class CoGroupITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "6\n3\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-			new Tuple2<Integer, String>(1, "hello"),
-			new Tuple2<Integer, String>(2, "what's"),
-			new Tuple2<Integer, String>(2, "up")
-		);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-			new Tuple2<Integer, String>(1, "not"),
-			new Tuple2<Integer, String>(1, "much"),
-			new Tuple2<Integer, String>(2, "really")
-		);
-		DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
-			.with((values1, values2, out) -> {
-				int sum = 0;
-				for (Tuple2<Integer, String> next : values1) {
-					sum += next.f0;
-				}
-				for (Tuple2<Integer, String> next : values2) {
-					sum += next.f0;
-				}
-				out.collect(sum);
-			});
-		joined.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
deleted file mode 100644
index 11a360e..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cross functions.
- */
-public class CrossITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,hello not\n" +
-			"3,what's not\n" +
-			"3,up not\n" +
-			"2,hello much\n" +
-			"3,what's much\n" +
-			"3,up much\n" +
-			"3,hello really\n" +
-			"4,what's really\n" +
-			"4,up really";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.cross(right)
-				.with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
deleted file mode 100644
index e6bef71..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda filter funtions.
- */
-public class FilterITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n";
-
-	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
-		data.add(new Tuple3<>(1, 1L, "Hi"));
-		data.add(new Tuple3<>(2, 2L, "Hello"));
-		data.add(new Tuple3<>(3, 2L, "Hello world"));
-		data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
-		data.add(new Tuple3<>(5, 3L, "I am fine."));
-		data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
-		data.add(new Tuple3<>(7, 4L, "Comment#1"));
-		data.add(new Tuple3<>(8, 4L, "Comment#2"));
-		data.add(new Tuple3<>(9, 4L, "Comment#3"));
-		data.add(new Tuple3<>(10, 4L, "Comment#4"));
-		data.add(new Tuple3<>(11, 5L, "Comment#5"));
-		data.add(new Tuple3<>(12, 5L, "Comment#6"));
-		data.add(new Tuple3<>(13, 5L, "Comment#7"));
-		data.add(new Tuple3<>(14, 5L, "Comment#8"));
-		data.add(new Tuple3<>(15, 5L, "Comment#9"));
-		data.add(new Tuple3<>(16, 6L, "Comment#10"));
-		data.add(new Tuple3<>(17, 6L, "Comment#11"));
-		data.add(new Tuple3<>(18, 6L, "Comment#12"));
-		data.add(new Tuple3<>(19, 6L, "Comment#13"));
-		data.add(new Tuple3<>(20, 6L, "Comment#14"));
-		data.add(new Tuple3<>(21, 6L, "Comment#15"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(value -> value.f2.contains("world"));
-		filterDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
deleted file mode 100644
index e35278d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class FlatJoinITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,what's really\n" +
-			"2,up really\n" +
-			"1,hello not\n" +
-			"1,hello much\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
-				.with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
deleted file mode 100644
index b5211c8..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda flatmap functions.
- */
-public class FlatMapITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "bb\n" +
-			"bb\n" +
-			"bc\n" +
-			"bd\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
-		DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
-		flatMappedDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
deleted file mode 100644
index 61061b8..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda groupreduce functions.
- */
-public class GroupReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "abad\n" +
-			"aaac\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
-				new Tuple2<>(1, "aa"),
-				new Tuple2<>(2, "ab"),
-				new Tuple2<>(1, "ac"),
-				new Tuple2<>(2, "ad")
-				);
-		DataSet<String> concatDs = stringDs
-				.groupBy(0)
-				.reduceGroup((values, out) -> {
-					String conc = "";
-					for (Tuple2<Integer, String> next : values) {
-						conc = conc.concat(next.f1);
-					}
-					out.collect(conc);
-				});
-		concatDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
deleted file mode 100644
index 0e78212..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class JoinITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "2,what's really\n" +
-			"2,up really\n" +
-			"1,hello not\n" +
-			"1,hello much\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Integer, String>> left = env.fromElements(
-				new Tuple2<Integer, String>(1, "hello"),
-				new Tuple2<Integer, String>(2, "what's"),
-				new Tuple2<Integer, String>(2, "up")
-				);
-		DataSet<Tuple2<Integer, String>> right = env.fromElements(
-				new Tuple2<Integer, String>(1, "not"),
-				new Tuple2<Integer, String>(1, "much"),
-				new Tuple2<Integer, String>(2, "really")
-				);
-		DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
-				.with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
-		joined.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
deleted file mode 100644
index 355d38d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda map functions.
- */
-public class MapITCase extends JavaProgramTestBase {
-
-	private static class Trade {
-
-		public String v;
-
-		public Trade(String v) {
-			this.v = v;
-		}
-
-		@Override
-		public String toString() {
-			return v;
-		}
-	}
-
-	private static final String EXPECTED_RESULT = "22\n" +
-			"22\n" +
-			"23\n" +
-			"24\n";
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
-		DataSet<String> mappedDs = stringDs
-			.map(Object::toString)
-			.map (s -> s.replace("1", "2"))
-			.map(Trade::new)
-			.map(Trade::toString);
-		mappedDs.writeAsText(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
deleted file mode 100644
index 24b39d7..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda reduce functions.
- */
-public class ReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
-			"2,3,2,Hallo Welt wie,1\n" +
-			"2,2,1,Hallo Welt,2\n" +
-			"3,9,0,P-),2\n" +
-			"3,6,5,BCD,3\n" +
-			"4,17,0,P-),1\n" +
-			"4,17,0,P-),2\n" +
-			"5,11,10,GHI,1\n" +
-			"5,29,0,P-),2\n" +
-			"5,25,0,P-),3\n";
-
-	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
-		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
-		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
-		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
-		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
-		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
-		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
-		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
-		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
-		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
-		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
-		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
-				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
-				.groupBy(4, 0)
-				.reduce((in1, in2) -> {
-					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
-					return out;
-				});
-
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}