You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/26 10:07:21 UTC

[3/4] [FLINK-1062] Type Extraction for Lambdas

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
deleted file mode 100644
index b8344a9..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-@SuppressWarnings("serial")
-public class ReduceITCase extends JavaProgramTestBase {
-
-	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
-			"2,3,2,Hallo Welt wie,1\n" +
-			"2,2,1,Hallo Welt,2\n" +
-			"3,9,0,P-),2\n" +
-			"3,6,5,BCD,3\n" +
-			"4,17,0,P-),1\n" +
-			"4,17,0,P-),2\n" +
-			"5,11,10,GHI,1\n" +
-			"5,29,0,P-),2\n" +
-			"5,25,0,P-),3\n";
-	
-	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(1,1l,0,"Hallo",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,5l,4,"ABC",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,6l,5,"BCD",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,7l,6,"CDE",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,8l,7,"DEF",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,9l,8,"EFG",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,10l,9,"FGH",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,11l,10,"GHI",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,12l,11,"HIJ",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,13l,12,"IJK",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,14l,13,"JKL",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,15l,14,"KLM",2l));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
-				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-	
-	private String resultPath;
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
-				.groupBy(4, 0)
-				.reduce((in1, in2) -> {
-					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
-					return out;
-				});
-
-		reduceDs.writeAsCsv(resultPath);
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
new file mode 100644
index 0000000..e733af5
--- /dev/null
+++ b/flink-java8/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.7-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-java8</artifactId>
+	<name>flink-java8</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.7</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<!-- just define the Java version to be used for compiling and plugins -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version><!--$NO-MVN-MAN-VER$-->
+				<configuration>
+					<source>1.8</source>
+					<target>1.8</target>
+					<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<systemPropertyVariables>
+						<log.level>WARN</log.level>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<systemPropertyVariables>
+						<log.level>WARN</log.level>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
+					<artifactId>maven-compiler-plugin</artifactId>
+					<configuration>
+						<source>1.8</source>
+						<target>1.8</target>
+						<compilerId>jdt</compilerId>
+					</configuration>
+					<dependencies>
+						<!-- This dependency provides the implementation of compiler "jdt": -->
+						<dependency>
+							<groupId>org.eclipse.tycho</groupId>
+							<artifactId>tycho-compiler-jdt</artifactId>
+							<version>0.21.0</version>
+						</dependency>
+					</dependencies>
+				</plugin>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-assembly-plugin</artifactId>
+										<versionRange>[2.4,)</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-compiler-plugin</artifactId>
+										<versionRange>[3.1,)</versionRange>
+										<goals>
+											<goal>testCompile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
new file mode 100644
index 0000000..9b67a43
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.relational;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+
+/**
+ * This program implements a modified version of the TPC-H query 10.
+ * The original query can be found at
+ * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
+ * 
+ * <p>
+ * This program implements the following SQL equivalent:
+ * 
+ * <p>
+ * <code><pre>
+ * SELECT 
+ *        c_custkey,
+ *        c_name, 
+ *        c_address,
+ *        n_name, 
+ *        c_acctbal
+ *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
+ * FROM   
+ *        customer, 
+ *        orders, 
+ *        lineitem, 
+ *        nation 
+ * WHERE 
+ *        c_custkey = o_custkey 
+ *        AND l_orderkey = o_orderkey 
+ *        AND YEAR(o_orderdate) > '1990' 
+ *        AND l_returnflag = 'R' 
+ *        AND c_nationkey = n_nationkey 
+ * GROUP BY 
+ *        c_custkey, 
+ *        c_name, 
+ *        c_acctbal, 
+ *        n_name, 
+ *        c_address
+ * </pre></code>
+ *        
+ * <p>
+ * Compared to the original TPC-H query this version does not print 
+ * c_phone and c_comment, only filters by years greater than 1990 instead of
+ * a period of 3 months, and does not sort the result by revenue.
+ * 
+ * <p>
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
+ * 
+ * <p>
+ * Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
+ *  
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> inline-defined functions using Java 8 Lambda Expressions
+ * </ul>
+ */
+public class TPCHQuery10 {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// get customer data set: (custkey, name, address, nationkey, acctbal) 
+		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
+
+		// get orders data set: (orderkey, custkey, orderdate)
+		DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
+
+		// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
+		DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
+
+		// get nation data set: (nationkey, name)
+		DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
+
+		// orders filtered by year: (orderkey, custkey)
+		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
+				// filter by year
+				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
+				// project fields out that are no longer required
+				.project(0,1).types(Integer.class, Integer.class);
+
+		// lineitems filtered by flag: (orderkey, extendedprice, discount)
+		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = 
+				// filter by flag
+				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
+				// project fields out that are no longer required
+				.project(0,1,2).types(Integer.class, Double.class, Double.class);
+
+		// join orders with lineitems: (custkey, extendedprice, discount)
+		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = 
+				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
+									.where(0).equalTo(0)
+									.projectFirst(1).projectSecond(1,2)
+									.types(Integer.class, Double.class, Double.class);
+
+		// aggregate for revenue: (custkey, revenue)
+		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
+				// calculate the revenue for each item
+				// revenue per item = l_extendedprice * (1 - l_discount)
+				.map(i -> new Tuple2<>(i.f0, i.f1 * (1 - i.f2)))
+				// aggregate the revenues per item to revenue per customer
+				.groupBy(0).sum(1);
+
+		// join customer with nation (custkey, name, address, nationname, acctbal)
+		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
+						.joinWithTiny(nations)
+						.where(3).equalTo(0)
+						.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
+						.types(Integer.class, String.class, String.class, String.class, Double.class);
+
+		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
+		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = 
+				customerWithNation.join(revenueOfCustomerKey)
+				.where(0).equalTo(0)
+				.projectFirst(0,1,2,3,4).projectSecond(1)
+				.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
+
+		// emit result
+		customerWithRevenue.writeAsCsv(outputPath);
+		
+		// execute program
+		env.execute("TPCH Query 10 Example");
+		
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static String customerPath;
+	private static String ordersPath;
+	private static String lineitemPath;
+	private static String nationPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] programArguments) {
+		
+		if(programArguments.length > 0) {
+			if(programArguments.length == 5) {
+				customerPath = programArguments[0];
+				ordersPath = programArguments[1];
+				lineitemPath = programArguments[2];
+				nationPath = programArguments[3];
+				outputPath = programArguments[4];
+			} else {
+				System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
+				return false;
+			}
+		} else {
+			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+								"  Due to legal restrictions, we can not ship generated data.\n" +
+								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
+								"  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
+			return false;
+		}
+		return true;
+	}
+	
+	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(customerPath)
+					.fieldDelimiter('|')
+					.includeFields("11110100")
+					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
+	}
+	
+	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(ordersPath)
+					.fieldDelimiter('|')
+					.includeFields("110010000")
+					.types(Integer.class, Integer.class, String.class);
+	}
+
+	private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(lineitemPath)
+					.fieldDelimiter('|')
+					.includeFields("1000011010000000")
+					.types(Integer.class, Double.class, Double.class, String.class);
+	}
+	
+	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(nationPath)
+					.fieldDelimiter('|')
+					.includeFields("1100")
+					.types(Integer.class, String.class);
+	}
+			
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
new file mode 100644
index 0000000..793962f
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.wordcount;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.example.java8.wordcount.util.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over text files. 
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a compact Flink program with Java 8 Lambda Expressions.
+ * </ul>
+ * 
+ */
+public class WordCount {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<String> text = getTextDataSet(env);
+		
+		DataSet<Tuple2<String, Integer>> counts = 
+				// normalize and split each line
+				text.map(line -> line.toLowerCase().split("\\W+"))
+				// convert splitted line in pairs (2-tuples) containing: (word,1)
+				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+					// emit the pairs with non-zero-length words
+					Arrays.stream(tokens)
+					.filter(t -> t.length() > 0)
+					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
+				})
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		if(fileOutput) {
+			counts.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			counts.print();
+		}
+		
+		// execute program
+		env.execute("WordCount Example");
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return WordCountData.getDefaultTextLineDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
new file mode 100644
index 0000000..9933696
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.wordcount.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Provides the default data sets used for the WordCount example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class WordCountData {
+
+	public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
+		
+		return env.fromElements(
+				"To be, or not to be,--that is the question:--",
+				"Whether 'tis nobler in the mind to suffer",
+				"The slings and arrows of outrageous fortune",
+				"Or to take arms against a sea of troubles,",
+				"And by opposing end them?--To die,--to sleep,--",
+				"No more; and by a sleep to say we end",
+				"The heartache, and the thousand natural shocks",
+				"That flesh is heir to,--'tis a consummation",
+				"Devoutly to be wish'd. To die,--to sleep;--",
+				"To sleep! perchance to dream:--ay, there's the rub;",
+				"For in that sleep of death what dreams may come,",
+				"When we have shuffled off this mortal coil,",
+				"Must give us pause: there's the respect",
+				"That makes calamity of so long life;",
+				"For who would bear the whips and scorns of time,",
+				"The oppressor's wrong, the proud man's contumely,",
+				"The pangs of despis'd love, the law's delay,",
+				"The insolence of office, and the spurns",
+				"That patient merit of the unworthy takes,",
+				"When he himself might his quietus make",
+				"With a bare bodkin? who would these fardels bear,",
+				"To grunt and sweat under a weary life,",
+				"But that the dread of something after death,--",
+				"The undiscover'd country, from whose bourn",
+				"No traveller returns,--puzzles the will,",
+				"And makes us rather bear those ills we have",
+				"Than fly to others that we know not of?",
+				"Thus conscience does make cowards of us all;",
+				"And thus the native hue of resolution",
+				"Is sicklied o'er with the pale cast of thought;",
+				"And enterprises of great pith and moment,",
+				"With this regard, their currents turn awry,",
+				"And lose the name of action.--Soft you now!",
+				"The fair Ophelia!--Nymph, in thy orisons",
+				"Be all my sins remember'd."
+				);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
new file mode 100644
index 0000000..fa85f8c
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.type.lambdas;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import junit.framework.Assert;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class LambdaExtractionTest {
+
+	@Test
+	public void testIdentifyLambdas() {
+		try {
+			MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
+				@Override
+				public Integer map(String value) { return Integer.parseInt(value); }
+			};
+			
+			MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
+				@Override
+				public Integer map(String value) { return Integer.parseInt(value); }
+			};
+			
+			MapFunction<?, ?> fromProperClass = new StaticMapper();
+			
+			MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
+				@Override
+				public Tuple2<Integer, Long> map(Integer value) {
+					return new Tuple2<Integer, Long>(value, 1L);
+				}
+			};
+			
+			MapFunction<String, Integer> lambda = (str) -> Integer.parseInt(str);
+			
+			assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface));
+			assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass));
+			assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass));
+			assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromDerived));
+			assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(lambda));
+			assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(STATIC_LAMBDA));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	public static class StaticMapper implements MapFunction<String, Integer> {
+
+		@Override
+		public Integer map(String value) { return Integer.parseInt(value); }
+	}
+	
+	public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+
+		@Override
+		public Tuple2<T, Long> map(T value) throws Exception;
+	}
+	
+	private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) -> Integer.parseInt(str);
+	
+	public static class MyClass {
+		private String s = "mystring";
+		
+		public MapFunction<Integer, String> getMapFunction() {
+			return (i) -> s;
+		}
+	}
+	
+	@Test
+	public void testLambdaWithMemberVariable() {		
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), TypeInfoParser.parse("Integer"));
+		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testLambdaWithLocalVariable() {
+		String s = "mystring";
+		final int k = 24;
+		int j = 26;
+		
+		MapFunction<Integer, String> f = (i) -> s + k + j;
+		
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Integer"));
+		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testMapLambda() {
+		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+		
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testFlatMapLambda() {
+		FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testMapPartitionLambda() {
+		MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testGroupReduceLambda() {
+		GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testFlatJoinLambda() {
+		FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testJoinLambda() {
+		JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
+		
+		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testCoGroupLambda() {
+		CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
+		
+		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testCrossLambda() {
+		CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
+		
+		TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@Test
+	public void testKeySelectorLambda() {
+		KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+		
+		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+	}
+	
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testLambdaTypeErasureException() {
+		MapFunction<Tuple1, Tuple1> f = (i) -> null;
+		
+		try {
+			TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1<String>"));
+			Assert.fail();
+		}
+		catch (InvalidTypesException e) {
+			// ok
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
new file mode 100644
index 0000000..1420483
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class AllGroupReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "aaabacad\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+		DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
+			String conc = "";
+			for (String s : values) {
+				conc = conc.concat(s);
+			}
+			out.collect(conc);
+		});
+		concatDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
new file mode 100644
index 0000000..667a786
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class CoGroupITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "6\n3\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
+				.with((values1, values2, out) -> {
+					int sum = 0;
+					for (Tuple2<Integer, String> next : values1) {
+						sum += next.f0;
+					}
+					for (Tuple2<Integer, String> next : values2) {
+						sum += next.f0;
+					}
+					out.collect(sum);
+				});
+		joined.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
new file mode 100644
index 0000000..60916c9
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class CrossITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "2,hello not\n" +
+			"3,what's not\n" +
+			"3,up not\n" +
+			"2,hello much\n" +
+			"3,what's much\n" +
+			"3,up much\n" +
+			"3,hello really\n" +
+			"4,what's really\n" +
+			"4,up really";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer,String>> joined = left.cross(right)
+				.with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
new file mode 100644
index 0000000..d83db06
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FilterITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n";
+
+	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
+		data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
+		data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
+		data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
+		data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
+		data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
+		data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
+		data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
+		data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
+		data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
+		data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
+		data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
+		data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
+		data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
+		data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
+		data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
+		data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
+		data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
+		data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
+		data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
+		data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
+		data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(value -> value.f2.contains("world"));
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
new file mode 100644
index 0000000..714c14c
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FlatJoinITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "2,what's really\n" +
+			"2,up really\n" +
+			"1,hello not\n" +
+			"1,hello much\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+				.with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
new file mode 100644
index 0000000..2b0e344
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FlatMapITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "bb\n" +
+			"bb\n" +
+			"bc\n" +
+			"bd\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+		DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
+		flatMappedDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
new file mode 100644
index 0000000..23300c8
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class GroupReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "abad\n" +
+			"aaac\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer,String>> stringDs = env.fromElements(
+				new Tuple2<Integer,String>(1, "aa"),
+				new Tuple2<Integer,String>(2, "ab"),
+				new Tuple2<Integer,String>(1, "ac"),
+				new Tuple2<Integer,String>(2, "ad")
+				);
+		DataSet<String> concatDs = stringDs
+				.groupBy(0)
+				.reduceGroup((values, out) -> {
+					String conc = "";
+					for (Tuple2<Integer,String> next : values) {
+						conc = conc.concat(next.f1);
+					}
+					out.collect(conc);
+				});
+		concatDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
new file mode 100644
index 0000000..aef35ac
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class JoinITCase extends JavaProgramTestBase {
+	
+	private static final String EXPECTED_RESULT = "2,what's really\n" +
+			"2,up really\n" +
+			"1,hello not\n" +
+			"1,hello much\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+				);
+		DataSet<Tuple2<Integer, String>> right = env.fromElements(
+				new Tuple2<Integer, String>(1, "not"),
+				new Tuple2<Integer, String>(1, "much"),
+				new Tuple2<Integer, String>(2, "really")
+				);
+		DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+				.with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1));
+		joined.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
new file mode 100644
index 0000000..d4cf585
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class MapITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "bb\n" +
+			"bb\n" +
+			"bc\n" +
+			"bd\n";
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+		DataSet<String> mappedDs = stringDs.map (s -> s.replace("a", "b"));
+		mappedDs.writeAsText(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
new file mode 100644
index 0000000..52c215f
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class ReduceITCase extends JavaProgramTestBase {
+
+	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
+			"2,3,2,Hallo Welt wie,1\n" +
+			"2,2,1,Hallo Welt,2\n" +
+			"3,9,0,P-),2\n" +
+			"3,6,5,BCD,3\n" +
+			"4,17,0,P-),1\n" +
+			"4,17,0,P-),2\n" +
+			"5,11,10,GHI,1\n" +
+			"5,29,0,P-),2\n" +
+			"5,25,0,P-),3\n";
+	
+	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(1,1l,0,"Hallo",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,5l,4,"ABC",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,6l,5,"BCD",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,7l,6,"CDE",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,8l,7,"DEF",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,9l,8,"EFG",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,10l,9,"FGH",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,11l,10,"GHI",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,12l,11,"HIJ",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,13l,12,"IJK",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,14l,13,"JKL",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,15l,14,"KLM",2l));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
+				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+	
+	private String resultPath;
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+				.groupBy(4, 0)
+				.reduce((in1, in2) -> {
+					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+					return out;
+				});
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/README.md
----------------------------------------------------------------------
diff --git a/flink-quickstart/README.md b/flink-quickstart/README.md
index 9ba0b16..e81cd57 100644
--- a/flink-quickstart/README.md
+++ b/flink-quickstart/README.md
@@ -25,3 +25,7 @@ The `quickstart.sh` script always points to the current stable release (v0.4, v0
 
 
 (Use `-DarchetypeCatalog=local` for local testing during archetype development)
+
+# Java 8 with Lambda Expressions
+
+If you are planning to use Java 8 and want to use Lambda Expression, please open the generated "pom.xml" file and modify/uncomment the mentioned lines.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index bd81df5..3ca5c35 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -47,7 +47,7 @@ under the License.
 	</repositories>
 	
 	<!-- These two requirements are the minimum to use and develop Flink. 
-		You can add others like <artifactId>pact-scala-core</artifactId> for Scala! -->
+		You can add others like <artifactId>flink-scala</artifactId> for Scala! -->
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -95,10 +95,69 @@ under the License.
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>3.1</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.6</source> <!-- If you want to use Java 8, change this to "1.8" -->
+					<target>1.6</target> <!-- If you want to use Java 8, change this to "1.8" -->
 				</configuration>
 			</plugin>
 		</plugins>
+		
+		<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
+		<!-- <pluginManagement>
+			<plugins>
+				<plugin>
+					<artifactId>maven-compiler-plugin</artifactId>
+					<configuration>
+						<source>1.8</source>
+						<target>1.8</target>
+						<compilerId>jdt</compilerId>
+					</configuration>
+					<dependencies>
+						<dependency>
+							<groupId>org.eclipse.tycho</groupId>
+							<artifactId>tycho-compiler-jdt</artifactId>
+							<version>0.21.0</version>
+						</dependency>
+					</dependencies>
+				</plugin>
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-assembly-plugin</artifactId>
+										<versionRange>[2.4,)</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-compiler-plugin</artifactId>
+										<versionRange>[3.1,)</versionRange>
+										<goals>
+											<goal>testCompile</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+		-->
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
index 4e424da..840b948 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
@@ -53,7 +53,7 @@ public class Job {
 		 * 	.filter()
 		 * 	.flatMap()
 		 * 	.join()
-		 * 	.group()
+		 * 	.coGroup()
 		 * and many more.
 		 * Have a look at the programming guide for the Java API:
 		 *

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f18b4c5..3874849 100644
--- a/pom.xml
+++ b/pom.xml
@@ -354,7 +354,7 @@ under the License.
 					<jdk>1.8</jdk>
 				</activation>
 				<modules>
-					<module>flink-java8-tests</module>
+					<module>flink-java8</module>
 				</modules>
 				<build>
 					<plugins>