You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/26 10:07:21 UTC
[3/4] [FLINK-1062] Type Extraction for Lambdas
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
deleted file mode 100644
index b8344a9..0000000
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-@SuppressWarnings("serial")
-public class ReduceITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
-
- public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
- List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(1,1l,0,"Hallo",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,5l,4,"ABC",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,6l,5,"BCD",3l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,7l,6,"CDE",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,8l,7,"DEF",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,9l,8,"EFG",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,10l,9,"FGH",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,11l,10,"GHI",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,12l,11,"HIJ",3l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,13l,12,"IJK",3l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,14l,13,"JKL",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,15l,14,"KLM",2l));
-
- Collections.shuffle(data);
-
- TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new
- TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO
- );
-
- return env.fromCollection(data, type);
- }
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
- .groupBy(4, 0)
- .reduce((in1, in2) -> {
- Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
- out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
- return out;
- });
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
new file mode 100644
index 0000000..e733af5
--- /dev/null
+++ b/flink-java8/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>0.7-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-java8</artifactId>
+ <name>flink-java8</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.7</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- just define the Java version to be used for compiling and plugins -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version><!--$NO-MVN-MAN-VER$-->
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <log.level>WARN</log.level>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <log.level>WARN</log.level>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <compilerId>jdt</compilerId>
+ </configuration>
+ <dependencies>
+ <!-- This dependency provides the implementation of compiler "jdt": -->
+ <dependency>
+ <groupId>org.eclipse.tycho</groupId>
+ <artifactId>tycho-compiler-jdt</artifactId>
+ <version>0.21.0</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <versionRange>[2.4,)</versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.1,)</versionRange>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
new file mode 100644
index 0000000..9b67a43
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.relational;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+
+/**
+ * This program implements a modified version of the TPC-H query 10.
+ * The original query can be found at
+ * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
+ *
+ * <p>
+ * This program implements the following SQL equivalent:
+ *
+ * <p>
+ * <code><pre>
+ * SELECT
+ * c_custkey,
+ * c_name,
+ * c_address,
+ * n_name,
+ * c_acctbal
+ * SUM(l_extendedprice * (1 - l_discount)) AS revenue,
+ * FROM
+ * customer,
+ * orders,
+ * lineitem,
+ * nation
+ * WHERE
+ * c_custkey = o_custkey
+ * AND l_orderkey = o_orderkey
+ * AND YEAR(o_orderdate) > '1990'
+ * AND l_returnflag = 'R'
+ * AND c_nationkey = n_nationkey
+ * GROUP BY
+ * c_custkey,
+ * c_name,
+ * c_acctbal,
+ * n_name,
+ * c_address
+ * </pre></code>
+ *
+ * <p>
+ * Compared to the original TPC-H query this version does not print
+ * c_phone and c_comment, only filters by years greater than 1990 instead of
+ * a period of 3 months, and does not sort the result by revenue.
+ *
+ * <p>
+ * Input files are plain text CSV files using the pipe character ('|') as field separator
+ * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
+ *
+ * <p>
+ * Usage: <code>TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path></code><br>
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> inline-defined functions using Java 8 Lambda Expressions
+ * </ul>
+ */
+public class TPCHQuery10 {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get customer data set: (custkey, name, address, nationkey, acctbal)
+ DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
+
+ // get orders data set: (orderkey, custkey, orderdate)
+ DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
+
+ // get lineitem data set: (orderkey, extendedprice, discount, returnflag)
+ DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
+
+ // get nation data set: (nationkey, name)
+ DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
+
+ // orders filtered by year: (orderkey, custkey)
+ DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
+ // filter by year
+ orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
+ // project fields out that are no longer required
+ .project(0,1).types(Integer.class, Integer.class);
+
+ // lineitems filtered by flag: (orderkey, extendedprice, discount)
+ DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
+ // filter by flag
+ lineitems.filter(lineitem -> lineitem.f3.equals("R"))
+ // project fields out that are no longer required
+ .project(0,1,2).types(Integer.class, Double.class, Double.class);
+
+ // join orders with lineitems: (custkey, extendedprice, discount)
+ DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
+ ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
+ .where(0).equalTo(0)
+ .projectFirst(1).projectSecond(1,2)
+ .types(Integer.class, Double.class, Double.class);
+
+ // aggregate for revenue: (custkey, revenue)
+ DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
+ // calculate the revenue for each item
+ // revenue per item = l_extendedprice * (1 - l_discount)
+ .map(i -> new Tuple2<>(i.f0, i.f1 * (1 - i.f2)))
+ // aggregate the revenues per item to revenue per customer
+ .groupBy(0).sum(1);
+
+ // join customer with nation (custkey, name, address, nationname, acctbal)
+ DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
+ .joinWithTiny(nations)
+ .where(3).equalTo(0)
+ .projectFirst(0,1,2).projectSecond(1).projectFirst(4)
+ .types(Integer.class, String.class, String.class, String.class, Double.class);
+
+ // join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
+ DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
+ customerWithNation.join(revenueOfCustomerKey)
+ .where(0).equalTo(0)
+ .projectFirst(0,1,2,3,4).projectSecond(1)
+ .types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
+
+ // emit result
+ customerWithRevenue.writeAsCsv(outputPath);
+
+ // execute program
+ env.execute("TPCH Query 10 Example");
+
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static String customerPath;
+ private static String ordersPath;
+ private static String lineitemPath;
+ private static String nationPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if(programArguments.length > 0) {
+ if(programArguments.length == 5) {
+ customerPath = programArguments[0];
+ ordersPath = programArguments[1];
+ lineitemPath = programArguments[2];
+ nationPath = programArguments[3];
+ outputPath = programArguments[4];
+ } else {
+ System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
+ return false;
+ }
+ } else {
+ System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+ " Due to legal restrictions, we can not ship generated data.\n" +
+ " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+ " Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
+ return false;
+ }
+ return true;
+ }
+
+ private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
+ return env.readCsvFile(customerPath)
+ .fieldDelimiter('|')
+ .includeFields("11110100")
+ .types(Integer.class, String.class, String.class, Integer.class, Double.class);
+ }
+
+ private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
+ return env.readCsvFile(ordersPath)
+ .fieldDelimiter('|')
+ .includeFields("110010000")
+ .types(Integer.class, Integer.class, String.class);
+ }
+
+ private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
+ return env.readCsvFile(lineitemPath)
+ .fieldDelimiter('|')
+ .includeFields("1000011010000000")
+ .types(Integer.class, Double.class, Double.class, String.class);
+ }
+
+ private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
+ return env.readCsvFile(nationPath)
+ .fieldDelimiter('|')
+ .includeFields("1100")
+ .types(Integer.class, String.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
new file mode 100644
index 0000000..793962f
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.wordcount;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.example.java8.wordcount.util.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over text files.
+ *
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a compact Flink program with Java 8 Lambda Expressions.
+ * </ul>
+ *
+ */
+public class WordCount {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataSet<String> text = getTextDataSet(env);
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // normalize and split each line
+ text.map(line -> line.toLowerCase().split("\\W+"))
+ // convert splitted line in pairs (2-tuples) containing: (word,1)
+ .flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+ // emit the pairs with non-zero-length words
+ Arrays.stream(tokens)
+ .filter(t -> t.length() > 0)
+ .forEach(t -> out.collect(new Tuple2<>(t, 1)));
+ })
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .sum(1);
+
+ // emit result
+ if(fileOutput) {
+ counts.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("WordCount Example");
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: WordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing WordCount example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from a file.");
+ System.out.println(" Usage: WordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ } else {
+ // get default test text data
+ return WordCountData.getDefaultTextLineDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
new file mode 100644
index 0000000..9933696
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java8.wordcount.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Provides the default data sets used for the WordCount example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class WordCountData {
+
+ public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
+
+ return env.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
new file mode 100644
index 0000000..fa85f8c
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.type.lambdas;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import junit.framework.Assert;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class LambdaExtractionTest {
+
+ @Test
+ public void testIdentifyLambdas() {
+ try {
+ MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) { return Integer.parseInt(value); }
+ };
+
+ MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) { return Integer.parseInt(value); }
+ };
+
+ MapFunction<?, ?> fromProperClass = new StaticMapper();
+
+ MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
+ @Override
+ public Tuple2<Integer, Long> map(Integer value) {
+ return new Tuple2<Integer, Long>(value, 1L);
+ }
+ };
+
+ MapFunction<String, Integer> lambda = (str) -> Integer.parseInt(str);
+
+ assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface));
+ assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass));
+ assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass));
+ assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromDerived));
+ assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(lambda));
+ assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(STATIC_LAMBDA));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ public static class StaticMapper implements MapFunction<String, Integer> {
+
+ @Override
+ public Integer map(String value) { return Integer.parseInt(value); }
+ }
+
+ public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+
+ @Override
+ public Tuple2<T, Long> map(T value) throws Exception;
+ }
+
+ private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) -> Integer.parseInt(str);
+
+ public static class MyClass {
+ private String s = "mystring";
+
+ public MapFunction<Integer, String> getMapFunction() {
+ return (i) -> s;
+ }
+ }
+
+ @Test
+ public void testLambdaWithMemberVariable() {
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), TypeInfoParser.parse("Integer"));
+ Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testLambdaWithLocalVariable() {
+ String s = "mystring";
+ final int k = 24;
+ int j = 26;
+
+ MapFunction<Integer, String> f = (i) -> s + k + j;
+
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Integer"));
+ Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testMapLambda() {
+ MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testFlatMapLambda() {
+ FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+
+ TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testMapPartitionLambda() {
+ MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+
+ TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testGroupReduceLambda() {
+ GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
+
+ TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testFlatJoinLambda() {
+ FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
+
+ TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testJoinLambda() {
+ JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
+
+ TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testCoGroupLambda() {
+ CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
+
+ TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testCrossLambda() {
+ CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
+
+ TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @Test
+ public void testKeySelectorLambda() {
+ KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
+
+ TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testLambdaTypeErasureException() {
+ MapFunction<Tuple1, Tuple1> f = (i) -> null;
+
+ try {
+ TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1<String>"));
+ Assert.fail();
+ }
+ catch (InvalidTypesException e) {
+ // ok
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
new file mode 100644
index 0000000..1420483
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class AllGroupReduceITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "aaabacad\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+ DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
+ String conc = "";
+ for (String s : values) {
+ conc = conc.concat(s);
+ }
+ out.collect(conc);
+ });
+ concatDs.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
new file mode 100644
index 0000000..667a786
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class CoGroupITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "6\n3\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
+ .with((values1, values2, out) -> {
+ int sum = 0;
+ for (Tuple2<Integer, String> next : values1) {
+ sum += next.f0;
+ }
+ for (Tuple2<Integer, String> next : values2) {
+ sum += next.f0;
+ }
+ out.collect(sum);
+ });
+ joined.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
new file mode 100644
index 0000000..60916c9
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class CrossITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "2,hello not\n" +
+ "3,what's not\n" +
+ "3,up not\n" +
+ "2,hello much\n" +
+ "3,what's much\n" +
+ "3,up much\n" +
+ "3,hello really\n" +
+ "4,what's really\n" +
+ "4,up really";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer,String>> joined = left.cross(right)
+ .with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1));
+ joined.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
new file mode 100644
index 0000000..d83db06
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FilterITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n";
+
+ public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
+
+ List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
+ data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
+ data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
+ data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
+ data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
+ data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
+ data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
+ data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
+ data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
+ data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
+ data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
+ data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
+ data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
+ data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
+ data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
+ data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
+ data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
+ data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
+ data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
+ data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
+ data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
+ data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(value -> value.f2.contains("world"));
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
new file mode 100644
index 0000000..714c14c
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FlatJoinITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "2,what's really\n" +
+ "2,up really\n" +
+ "1,hello not\n" +
+ "1,hello much\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+ .with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
+ joined.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
new file mode 100644
index 0000000..2b0e344
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class FlatMapITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "bb\n" +
+ "bb\n" +
+ "bc\n" +
+ "bd\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+ DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
+ flatMappedDs.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
new file mode 100644
index 0000000..23300c8
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class GroupReduceITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "abad\n" +
+ "aaac\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer,String>> stringDs = env.fromElements(
+ new Tuple2<Integer,String>(1, "aa"),
+ new Tuple2<Integer,String>(2, "ab"),
+ new Tuple2<Integer,String>(1, "ac"),
+ new Tuple2<Integer,String>(2, "ad")
+ );
+ DataSet<String> concatDs = stringDs
+ .groupBy(0)
+ .reduceGroup((values, out) -> {
+ String conc = "";
+ for (Tuple2<Integer,String> next : values) {
+ conc = conc.concat(next.f1);
+ }
+ out.collect(conc);
+ });
+ concatDs.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
new file mode 100644
index 0000000..aef35ac
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class JoinITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "2,what's really\n" +
+ "2,up really\n" +
+ "1,hello not\n" +
+ "1,hello much\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+ .with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1));
+ joined.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
new file mode 100644
index 0000000..d4cf585
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class MapITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "bb\n" +
+ "bb\n" +
+ "bc\n" +
+ "bd\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+ DataSet<String> mappedDs = stringDs.map (s -> s.replace("a", "b"));
+ mappedDs.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
new file mode 100644
index 0000000..52c215f
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class ReduceITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+
+ public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
+
+ List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(1,1l,0,"Hallo",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,5l,4,"ABC",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,6l,5,"BCD",3l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,7l,6,"CDE",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,8l,7,"DEF",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,9l,8,"EFG",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,10l,9,"FGH",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,11l,10,"GHI",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,12l,11,"HIJ",3l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,13l,12,"IJK",3l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,14l,13,"JKL",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,15l,14,"KLM",2l));
+
+ Collections.shuffle(data);
+
+ TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new
+ TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+ .groupBy(4, 0)
+ .reduce((in1, in2) -> {
+ Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+ out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+ return out;
+ });
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/README.md
----------------------------------------------------------------------
diff --git a/flink-quickstart/README.md b/flink-quickstart/README.md
index 9ba0b16..e81cd57 100644
--- a/flink-quickstart/README.md
+++ b/flink-quickstart/README.md
@@ -25,3 +25,7 @@ The `quickstart.sh` script always points to the current stable release (v0.4, v0
(Use `-DarchetypeCatalog=local` for local testing during archetype development)
+
+# Java 8 with Lambda Expressions
+
+If you are planning to use Java 8 and want to use Lambda Expression, please open the generated "pom.xml" file and modify/uncomment the mentioned lines.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index bd81df5..3ca5c35 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -47,7 +47,7 @@ under the License.
</repositories>
<!-- These two requirements are the minimum to use and develop Flink.
- You can add others like <artifactId>pact-scala-core</artifactId> for Scala! -->
+ You can add others like <artifactId>flink-scala</artifactId> for Scala! -->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -95,10 +95,69 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.6</source> <!-- If you want to use Java 8, change this to "1.8" -->
+ <target>1.6</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
</plugins>
+
+ <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
+ <!-- <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <compilerId>jdt</compilerId>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.tycho</groupId>
+ <artifactId>tycho-compiler-jdt</artifactId>
+ <version>0.21.0</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <versionRange>[2.4,)</versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.1,)</versionRange>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ -->
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
index 4e424da..840b948 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
@@ -53,7 +53,7 @@ public class Job {
* .filter()
* .flatMap()
* .join()
- * .group()
+ * .coGroup()
* and many more.
* Have a look at the programming guide for the Java API:
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/849e398a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f18b4c5..3874849 100644
--- a/pom.xml
+++ b/pom.xml
@@ -354,7 +354,7 @@ under the License.
<jdk>1.8</jdk>
</activation>
<modules>
- <module>flink-java8-tests</module>
+ <module>flink-java8</module>
</modules>
<build>
<plugins>