You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/27 19:18:42 UTC
[1/2] flink git commit: [FLINK-6720] Activate strict checkstyle in
flink-java8
Repository: flink
Updated Branches:
refs/heads/master 77b0fb9fe -> 7355a59f4
[FLINK-6720] Activate strict checkstyle in flink-java8
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7085de4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7085de4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7085de4
Branch: refs/heads/master
Commit: b7085de440e0e29c010d242846a74d3fd923cde7
Parents: 77b0fb9
Author: zentol <ch...@apache.org>
Authored: Fri May 26 08:45:36 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat May 27 19:24:23 2017 +0200
----------------------------------------------------------------------
flink-java8/pom.xml | 34 +++++
.../examples/java8/relational/TPCHQuery10.java | 124 +++++++++----------
.../examples/java8/wordcount/WordCount.java | 59 +++++----
.../examples/java8/wordcount/WordCount.java | 57 ++++-----
.../java/type/lambdas/LambdaExtractionTest.java | 40 +++---
.../org/apache/flink/cep/CEPLambdaTest.java | 16 ++-
.../runtime/util/JarFileCreatorLambdaTest.java | 12 +-
.../util/jartestprogram/FilterLambda1.java | 4 +-
.../util/jartestprogram/FilterLambda2.java | 4 +
.../util/jartestprogram/FilterLambda3.java | 3 +
.../util/jartestprogram/FilterLambda4.java | 3 +
.../util/jartestprogram/UtilFunction.java | 3 +
.../jartestprogram/UtilFunctionWrapper.java | 6 +
.../runtime/util/jartestprogram/WordFilter.java | 3 +
.../lambdas/AllGroupReduceITCase.java | 3 +
.../javaApiOperators/lambdas/CoGroupITCase.java | 41 +++---
.../javaApiOperators/lambdas/CrossITCase.java | 7 +-
.../javaApiOperators/lambdas/FilterITCase.java | 53 ++++----
.../lambdas/FlatJoinITCase.java | 9 +-
.../javaApiOperators/lambdas/FlatMapITCase.java | 3 +
.../lambdas/GroupReduceITCase.java | 17 +--
.../javaApiOperators/lambdas/JoinITCase.java | 9 +-
.../javaApiOperators/lambdas/MapITCase.java | 5 +-
.../javaApiOperators/lambdas/ReduceITCase.java | 47 +++----
24 files changed, 328 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index fb7da02..bfc9cdd 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -230,6 +230,40 @@ under the License.
</lifecycleMappingMetadata>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</pluginManagement>
</build>
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
index 1ad4f41..c0fce4d 100644
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
+++ b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
@@ -30,71 +30,65 @@ import org.apache.flink.api.java.tuple.Tuple6;
* This program implements a modified version of the TPC-H query 10.
* The original query can be found at
* <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- *
- * <p>
- * This program implements the following SQL equivalent:
- *
- * <p>
- * <pre>{@code
- * SELECT
+ *
+ * <p>This program implements the following SQL equivalent:
+ *
+ * <p><pre>{@code
+ * SELECT
* c_custkey,
- * c_name,
+ * c_name,
* c_address,
- * n_name,
+ * n_name,
* c_acctbal
- * SUM(l_extendedprice * (1 - l_discount)) AS revenue,
- * FROM
- * customer,
- * orders,
- * lineitem,
- * nation
- * WHERE
- * c_custkey = o_custkey
- * AND l_orderkey = o_orderkey
- * AND YEAR(o_orderdate) > '1990'
- * AND l_returnflag = 'R'
- * AND c_nationkey = n_nationkey
- * GROUP BY
- * c_custkey,
- * c_name,
- * c_acctbal,
- * n_name,
+ * SUM(l_extendedprice * (1 - l_discount)) AS revenue,
+ * FROM
+ * customer,
+ * orders,
+ * lineitem,
+ * nation
+ * WHERE
+ * c_custkey = o_custkey
+ * AND l_orderkey = o_orderkey
+ * AND YEAR(o_orderdate) > '1990'
+ * AND l_returnflag = 'R'
+ * AND c_nationkey = n_nationkey
+ * GROUP BY
+ * c_custkey,
+ * c_name,
+ * c_acctbal,
+ * n_name,
* c_address
* }</pre>
- *
- * <p>
- * Compared to the original TPC-H query this version does not print
+ *
+ * <p>Compared to the original TPC-H query this version does not print
* c_phone and c_comment, only filters by years greater than 1990 instead of
* a period of 3 months, and does not sort the result by revenue.
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator
+ *
+ * <p>Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- * <p>
- * Usage: <code>TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path></code><br>
- *
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path></code><br>
+ *
+ * <p>This example shows how to use:
* <ul>
* <li> inline-defined functions using Java 8 Lambda Expressions
* </ul>
*/
public class TPCHQuery10 {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
+
+ if (!parseParameters(args)) {
return;
}
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- // get customer data set: (custkey, name, address, nationkey, acctbal)
+ // get customer data set: (custkey, name, address, nationkey, acctbal)
DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
// get orders data set: (orderkey, custkey, orderdate)
@@ -111,20 +105,20 @@ public class TPCHQuery10 {
// filter by year
orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
// project fields out that are no longer required
- .project(0,1);
+ .project(0, 1);
// lineitems filtered by flag: (orderkey, extendedprice, discount)
- DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
+ DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
// filter by flag
lineitems.filter(lineitem -> lineitem.f3.equals("R"))
// project fields out that are no longer required
- .project(0,1,2);
+ .project(0, 1, 2);
// join orders with lineitems: (custkey, extendedprice, discount)
- DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
+ DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
.where(0).equalTo(0)
- .projectFirst(1).projectSecond(1,2);
+ .projectFirst(1).projectSecond(1, 2);
// aggregate for revenue: (custkey, revenue)
DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
@@ -138,36 +132,36 @@ public class TPCHQuery10 {
DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
.joinWithTiny(nations)
.where(3).equalTo(0)
- .projectFirst(0,1,2).projectSecond(1).projectFirst(4);
+ .projectFirst(0, 1, 2).projectSecond(1).projectFirst(4);
// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
- DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
+ DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
customerWithNation.join(revenueOfCustomerKey)
.where(0).equalTo(0)
- .projectFirst(0,1,2,3,4).projectSecond(1);
+ .projectFirst(0, 1, 2, 3, 4).projectSecond(1);
// emit result
customerWithRevenue.writeAsCsv(outputPath);
-
+
// execute program
env.execute("TPCH Query 10 Example");
-
+
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static String customerPath;
private static String ordersPath;
private static String lineitemPath;
private static String nationPath;
private static String outputPath;
-
+
private static boolean parseParameters(String[] programArguments) {
-
- if(programArguments.length > 0) {
- if(programArguments.length == 5) {
+
+ if (programArguments.length > 0) {
+ if (programArguments.length == 5) {
customerPath = programArguments[0];
ordersPath = programArguments[1];
lineitemPath = programArguments[2];
@@ -180,20 +174,20 @@ public class TPCHQuery10 {
} else {
System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
" Due to legal restrictions, we can not ship generated data.\n" +
- " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+ " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
" Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
return false;
}
return true;
}
-
+
private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
return env.readCsvFile(customerPath)
.fieldDelimiter("|")
.includeFields("11110100")
.types(Integer.class, String.class, String.class, Integer.class, Double.class);
}
-
+
private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
return env.readCsvFile(ordersPath)
.fieldDelimiter("|")
@@ -207,12 +201,12 @@ public class TPCHQuery10 {
.includeFields("1000011010000000")
.types(Integer.class, Double.class, Double.class, String.class);
}
-
+
private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
return env.readCsvFile(nationPath)
.fieldDelimiter("|")
.includeFields("1100")
.types(Integer.class, String.class);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
index aead125..0130dec 100644
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
+++ b/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
@@ -18,51 +18,48 @@
package org.apache.flink.examples.java8.wordcount;
-import java.util.Arrays;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
+import java.util.Arrays;
+
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram
- * over text files.
- *
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- *
- * <p>
- * Usage: <code>WordCount <text path> <result path></code><br>
+ * over text files.
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>write a compact Flink program with Java 8 Lambda Expressions.
* </ul>
- *
+ *
*/
public class WordCount {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
+
+ if (!parseParameters(args)) {
return;
}
-
+
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// get input data
DataSet<String> text = getTextDataSet(env);
-
- DataSet<Tuple2<String, Integer>> counts =
+
+ DataSet<Tuple2<String, Integer>> counts =
// normalize and split each line
text.map(line -> line.toLowerCase().split("\\W+"))
// convert splitted line in pairs (2-tuples) containing: (word,1)
@@ -77,30 +74,30 @@ public class WordCount {
.sum(1);
// emit result
- if(fileOutput) {
+ if (fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
} else {
counts.print();
}
-
+
// execute program
env.execute("WordCount Example");
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
-
+
private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
+
+ if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if(args.length == 2) {
+ if (args.length == 2) {
textPath = args[0];
outputPath = args[1];
} else {
@@ -114,9 +111,9 @@ public class WordCount {
}
return true;
}
-
+
private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
+ if (fileOutput) {
// read the text file from given input path
return env.readTextFile(textPath);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
index 80b0ce0..f991433 100644
--- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
+++ b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
@@ -18,51 +18,48 @@
package org.apache.flink.streaming.examples.java8.wordcount;
-import java.util.Arrays;
-
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
+import java.util.Arrays;
+
/**
* Implements the streaming "WordCount" program that computes a simple word occurrences
- * over text files.
- *
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- *
- * <p>
- * Usage: <code>WordCount <text path> <result path></code><br>
+ * over text files.
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>write a compact Flink Streaming program with Java 8 Lambda Expressions.
* </ul>
- *
+ *
*/
public class WordCount {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
+
+ if (!parseParameters(args)) {
return;
}
-
+
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
+
// get input data
DataStream<String> text = getTextDataStream(env);
-
- DataStream<Tuple2<String, Integer>> counts =
+
+ DataStream<Tuple2<String, Integer>> counts =
// normalize and split each line
text.map(line -> line.toLowerCase().split("\\W+"))
// convert splitted line in pairs (2-tuples) containing: (word,1)
@@ -77,30 +74,30 @@ public class WordCount {
.sum(1);
// emit result
- if(fileOutput) {
+ if (fileOutput) {
counts.writeAsCsv(outputPath);
} else {
counts.print();
}
-
+
// execute program
env.execute("Streaming WordCount Example");
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
-
+
private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
+
+ if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if(args.length == 2) {
+ if (args.length == 2) {
textPath = args[0];
outputPath = args[1];
} else {
@@ -114,7 +111,7 @@ public class WordCount {
}
return true;
}
-
+
private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
if (fileOutput) {
// read the text file from given input path
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 0d7415a..64ff605 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -18,14 +18,6 @@
package org.apache.flink.api.java.type.lambdas;
-import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import org.junit.Assert;
-
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -44,8 +36,18 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
+
+import org.junit.Assert;
import org.junit.Test;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests the type extractor for lambda functions.
+ */
@SuppressWarnings("serial")
public class LambdaExtractionTest {
@@ -54,12 +56,16 @@ public class LambdaExtractionTest {
try {
MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
@Override
- public Integer map(String value) { return Integer.parseInt(value); }
+ public Integer map(String value) {
+ return Integer.parseInt(value);
+ }
};
MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
@Override
- public Integer map(String value) { return Integer.parseInt(value); }
+ public Integer map(String value) {
+ return Integer.parseInt(value);
+ }
};
MapFunction<?, ?> fromProperClass = new StaticMapper();
@@ -90,19 +96,21 @@ public class LambdaExtractionTest {
}
}
- public static class StaticMapper implements MapFunction<String, Integer> {
+ private static class StaticMapper implements MapFunction<String, Integer> {
@Override
- public Integer map(String value) { return Integer.parseInt(value); }
+ public Integer map(String value) {
+ return Integer.parseInt(value);
+ }
}
- public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+ private interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
@Override
Tuple2<T, Long> map(T value) throws Exception;
}
private static final MapFunction<String, Integer> STATIC_LAMBDA = Integer::parseInt;
- public static class MyClass {
+ private static class MyClass {
private String s = "mystring";
public MapFunction<Integer, String> getMapFunction() {
@@ -253,7 +261,7 @@ public class LambdaExtractionTest {
Assert.assertTrue(ti instanceof MissingTypeInfo);
}
- public static class MyType {
+ private static class MyType {
private int key;
public int getKey() {
@@ -283,7 +291,7 @@ public class LambdaExtractionTest {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
}
- public static class MySubtype extends MyType {
+ private static class MySubtype extends MyType {
public boolean test;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 03fb3c6..4eff037 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -33,15 +33,24 @@ import org.junit.Test;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+/**
+ * Tests for lambda support in CEP.
+ */
public class CEPLambdaTest extends TestLogger {
+ /**
+ * Test event class.
+ */
public static class EventA {}
+ /**
+ * Test event class.
+ */
public static class EventB {}
/**
- * Tests that a Java8 lambda can be passed as a CEP select function
+ * Tests that a Java8 lambda can be passed as a CEP select function.
*/
@Test
@Ignore
@@ -49,7 +58,6 @@ public class CEPLambdaTest extends TestLogger {
TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
-
DataStream<EventA> inputStream = new DataStream<>(
StreamExecutionEnvironment.getExecutionEnvironment(),
new SourceTransformation<>(
@@ -70,7 +78,7 @@ public class CEPLambdaTest extends TestLogger {
}
/**
- * Tests that a Java8 lambda can be passed as a CEP flat select function
+ * Tests that a Java8 lambda can be passed as a CEP flat select function.
*/
@Test
@Ignore
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
index d90f096..ca11275 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.util;
-
import org.apache.flink.runtime.util.jartestprogram.FilterLambda1;
import org.apache.flink.runtime.util.jartestprogram.FilterLambda2;
import org.apache.flink.runtime.util.jartestprogram.FilterLambda3;
@@ -34,9 +33,12 @@ import java.util.Set;
import java.util.jar.JarInputStream;
import java.util.zip.ZipEntry;
+/**
+ * Tests for the {@link JarFileCreator}.
+ */
public class JarFileCreatorLambdaTest {
@Test
- public void TestFilterFunctionOnLambda1() throws Exception {
+ public void testFilterFunctionOnLambda1() throws Exception {
File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
JarFileCreator jfc = new JarFileCreator(out);
jfc.addClass(FilterLambda1.class)
@@ -51,7 +53,7 @@ public class JarFileCreatorLambdaTest {
}
@Test
- public void TestFilterFunctionOnLambda2() throws Exception{
+ public void testFilterFunctionOnLambda2() throws Exception{
File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
JarFileCreator jfc = new JarFileCreator(out);
jfc.addClass(FilterLambda2.class)
@@ -66,7 +68,7 @@ public class JarFileCreatorLambdaTest {
}
@Test
- public void TestFilterFunctionOnLambda3() throws Exception {
+ public void testFilterFunctionOnLambda3() throws Exception {
File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
JarFileCreator jfc = new JarFileCreator(out);
jfc.addClass(FilterLambda3.class)
@@ -82,7 +84,7 @@ public class JarFileCreatorLambdaTest {
}
@Test
- public void TestFilterFunctionOnLambda4() throws Exception {
+ public void testFilterFunctionOnLambda4() throws Exception {
File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
JarFileCreator jfc = new JarFileCreator(out);
jfc.addClass(FilterLambda4.class)
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
index cd0c9e7..12abff9 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
@@ -18,11 +18,13 @@
package org.apache.flink.runtime.util.jartestprogram;
-
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+/**
+ * A lambda filter using a static method.
+ */
public class FilterLambda1 {
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
index 06c279d..9555607 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
@@ -18,9 +18,13 @@
package org.apache.flink.runtime.util.jartestprogram;
+import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+/**
+ * Similar to {@link FilterLambda1}, but the filter lambda is directly passed to {@link DataSet#filter(FilterFunction)}.
+ */
public class FilterLambda2 {
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
index 27fd2d9..b493722 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.util.jartestprogram;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+/**
+ * Similar to {@link FilterLambda2}, but uses a getter to retrieve a lambda filter instance.
+ */
public class FilterLambda3 {
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
index e66adb0..606ef5e 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.util.jartestprogram;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+/**
+ * Similar to {@link FilterLambda3} with additional indirection.
+ */
public class FilterLambda4 {
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
index e662015..1d5394a 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.util.jartestprogram;
import org.apache.flink.api.common.functions.FilterFunction;
+/**
+ * Static factory for a lambda filter function.
+ */
public class UtilFunction {
public static FilterFunction<String> getWordFilter() {
return (v) -> WordFilter.filter(v);
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
index f97cdd8..de8f68a 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
@@ -20,7 +20,13 @@ package org.apache.flink.runtime.util.jartestprogram;
import org.apache.flink.api.common.functions.FilterFunction;
+/**
+ * A wrapper around {@link WordFilter} to introduce additional indirection.
+ */
public class UtilFunctionWrapper {
+ /**
+ * Static factory for a lambda filter function.
+ */
public static class UtilFunction {
public static FilterFunction<String> getWordFilter() {
return (v) -> WordFilter.filter(v);
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
index c6f833a..4a5b16f 100644
--- a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.util.jartestprogram;
+/**
+ * Static filter method for lambda tests.
+ */
public class WordFilter {
public static boolean filter(String value) {
return !value.contains("not");
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
index 1420483..7dada56 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * IT cases for lambda allreduce functions.
+ */
public class AllGroupReduceITCase extends JavaProgramTestBase {
private static final String EXPECTED_RESULT = "aaabacad\n";
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
index 667a786..30362cb 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * IT cases for lambda cogroup functions.
+ */
public class CoGroupITCase extends JavaProgramTestBase {
private static final String EXPECTED_RESULT = "6\n3\n";
@@ -40,26 +43,26 @@ public class CoGroupITCase extends JavaProgramTestBase {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
- .with((values1, values2, out) -> {
- int sum = 0;
- for (Tuple2<Integer, String> next : values1) {
- sum += next.f0;
- }
- for (Tuple2<Integer, String> next : values2) {
- sum += next.f0;
- }
- out.collect(sum);
- });
+ .with((values1, values2, out) -> {
+ int sum = 0;
+ for (Tuple2<Integer, String> next : values1) {
+ sum += next.f0;
+ }
+ for (Tuple2<Integer, String> next : values2) {
+ sum += next.f0;
+ }
+ out.collect(sum);
+ });
joined.writeAsText(resultPath);
env.execute();
}
@@ -68,4 +71,4 @@ public class CoGroupITCase extends JavaProgramTestBase {
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
index 60916c9..11a360e 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * IT cases for lambda cross functions.
+ */
public class CrossITCase extends JavaProgramTestBase {
private static final String EXPECTED_RESULT = "2,hello not\n" +
@@ -57,8 +60,8 @@ public class CrossITCase extends JavaProgramTestBase {
new Tuple2<Integer, String>(1, "much"),
new Tuple2<Integer, String>(2, "really")
);
- DataSet<Tuple2<Integer,String>> joined = left.cross(right)
- .with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1));
+ DataSet<Tuple2<Integer, String>> joined = left.cross(right)
+ .with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
joined.writeAsCsv(resultPath);
env.execute();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
index d83db06..e6bef71 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -18,15 +18,18 @@
package org.apache.flink.test.javaApiOperators.lambdas;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for lambda filter funtions.
+ */
public class FilterITCase extends JavaProgramTestBase {
private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
@@ -35,27 +38,27 @@ public class FilterITCase extends JavaProgramTestBase {
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
- data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
- data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
- data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
- data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
- data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
- data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
- data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
- data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
- data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
- data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
- data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
- data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
- data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
- data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
- data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
- data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
- data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
- data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
- data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
- data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
- data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
+ data.add(new Tuple3<>(1, 1L, "Hi"));
+ data.add(new Tuple3<>(2, 2L, "Hello"));
+ data.add(new Tuple3<>(3, 2L, "Hello world"));
+ data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
+ data.add(new Tuple3<>(5, 3L, "I am fine."));
+ data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
+ data.add(new Tuple3<>(7, 4L, "Comment#1"));
+ data.add(new Tuple3<>(8, 4L, "Comment#2"));
+ data.add(new Tuple3<>(9, 4L, "Comment#3"));
+ data.add(new Tuple3<>(10, 4L, "Comment#4"));
+ data.add(new Tuple3<>(11, 5L, "Comment#5"));
+ data.add(new Tuple3<>(12, 5L, "Comment#6"));
+ data.add(new Tuple3<>(13, 5L, "Comment#7"));
+ data.add(new Tuple3<>(14, 5L, "Comment#8"));
+ data.add(new Tuple3<>(15, 5L, "Comment#9"));
+ data.add(new Tuple3<>(16, 6L, "Comment#10"));
+ data.add(new Tuple3<>(17, 6L, "Comment#11"));
+ data.add(new Tuple3<>(18, 6L, "Comment#12"));
+ data.add(new Tuple3<>(19, 6L, "Comment#13"));
+ data.add(new Tuple3<>(20, 6L, "Comment#14"));
+ data.add(new Tuple3<>(21, 6L, "Comment#15"));
Collections.shuffle(data);
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
index 714c14c..e35278d 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * IT cases for lambda join functions.
+ */
public class FlatJoinITCase extends JavaProgramTestBase {
private static final String EXPECTED_RESULT = "2,what's really\n" +
@@ -52,8 +55,8 @@ public class FlatJoinITCase extends JavaProgramTestBase {
new Tuple2<Integer, String>(1, "much"),
new Tuple2<Integer, String>(2, "really")
);
- DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
- .with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
+ DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
+ .with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
joined.writeAsCsv(resultPath);
env.execute();
}
@@ -62,4 +65,4 @@ public class FlatJoinITCase extends JavaProgramTestBase {
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
index 2b0e344..b5211c8 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * IT cases for lambda flatmap functions.
+ */
public class FlatMapITCase extends JavaProgramTestBase {
private static final String EXPECTED_RESULT = "bb\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
index 23300c8..61061b8 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * IT cases for lambda groupreduce functions.
+ */
public class GroupReduceITCase extends JavaProgramTestBase {
private static final String EXPECTED_RESULT = "abad\n" +
@@ -40,17 +43,17 @@ public class GroupReduceITCase extends JavaProgramTestBase {
protected void testProgram() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple2<Integer,String>> stringDs = env.fromElements(
- new Tuple2<Integer,String>(1, "aa"),
- new Tuple2<Integer,String>(2, "ab"),
- new Tuple2<Integer,String>(1, "ac"),
- new Tuple2<Integer,String>(2, "ad")
+ DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
+ new Tuple2<>(1, "aa"),
+ new Tuple2<>(2, "ab"),
+ new Tuple2<>(1, "ac"),
+ new Tuple2<>(2, "ad")
);
DataSet<String> concatDs = stringDs
.groupBy(0)
.reduceGroup((values, out) -> {
String conc = "";
- for (Tuple2<Integer,String> next : values) {
+ for (Tuple2<Integer, String> next : values) {
conc = conc.concat(next.f1);
}
out.collect(conc);
@@ -63,4 +66,4 @@ public class GroupReduceITCase extends JavaProgramTestBase {
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
index aef35ac..0e78212 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * IT cases for lambda join functions.
+ */
public class JoinITCase extends JavaProgramTestBase {
-
+
private static final String EXPECTED_RESULT = "2,what's really\n" +
"2,up really\n" +
"1,hello not\n" +
@@ -52,8 +55,8 @@ public class JoinITCase extends JavaProgramTestBase {
new Tuple2<Integer, String>(1, "much"),
new Tuple2<Integer, String>(2, "really")
);
- DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
- .with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1));
+ DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
+ .with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
joined.writeAsCsv(resultPath);
env.execute();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
index 87c1fa5..355d38d 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -22,9 +22,12 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * IT cases for lambda map functions.
+ */
public class MapITCase extends JavaProgramTestBase {
- public static class Trade {
+ private static class Trade {
public String v;
http://git-wip-us.apache.org/repos/asf/flink/blob/b7085de4/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
index 52c215f..24b39d7 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -18,10 +18,6 @@
package org.apache.flink.test.javaApiOperators.lambdas;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,6 +25,13 @@ import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for lambda reduce functions.
+ */
public class ReduceITCase extends JavaProgramTestBase {
private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
@@ -41,25 +44,25 @@ public class ReduceITCase extends JavaProgramTestBase {
"5,11,10,GHI,1\n" +
"5,29,0,P-),2\n" +
"5,25,0,P-),3\n";
-
+
public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(1,1l,0,"Hallo",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,5l,4,"ABC",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,6l,5,"BCD",3l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,7l,6,"CDE",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,8l,7,"DEF",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,9l,8,"EFG",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,10l,9,"FGH",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,11l,10,"GHI",1l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,12l,11,"HIJ",3l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,13l,12,"IJK",3l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,14l,13,"JKL",2l));
- data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,15l,14,"KLM",2l));
+ data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+ data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+ data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+ data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+ data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+ data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+ data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+ data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+ data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+ data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+ data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+ data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+ data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+ data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+ data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
Collections.shuffle(data);
@@ -74,9 +77,9 @@ public class ReduceITCase extends JavaProgramTestBase {
return env.fromCollection(data, type);
}
-
+
private String resultPath;
-
+
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
[2/2] flink git commit: [FLINK-6720] Rename java8 javaApiOperators
package to api.java.operators
Posted by ch...@apache.org.
[FLINK-6720] Rename java8 javaApiOperators package to api.java.operators
This closes #3999.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7355a59f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7355a59f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7355a59f
Branch: refs/heads/master
Commit: 7355a59f48fcda834a256f7925e00c66312494ed
Parents: b7085de
Author: zentol <ch...@apache.org>
Authored: Fri May 26 09:22:44 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat May 27 19:24:26 2017 +0200
----------------------------------------------------------------------
.../operators/lambdas/AllGroupReduceITCase.java | 59 ++++++++++
.../java/operators/lambdas/CoGroupITCase.java | 74 +++++++++++++
.../api/java/operators/lambdas/CrossITCase.java | 73 +++++++++++++
.../java/operators/lambdas/FilterITCase.java | 91 ++++++++++++++++
.../java/operators/lambdas/FlatJoinITCase.java | 68 ++++++++++++
.../java/operators/lambdas/FlatMapITCase.java | 56 ++++++++++
.../operators/lambdas/GroupReduceITCase.java | 69 ++++++++++++
.../api/java/operators/lambdas/JoinITCase.java | 69 ++++++++++++
.../api/java/operators/lambdas/MapITCase.java | 74 +++++++++++++
.../java/operators/lambdas/ReduceITCase.java | 109 +++++++++++++++++++
.../lambdas/AllGroupReduceITCase.java | 59 ----------
.../javaApiOperators/lambdas/CoGroupITCase.java | 74 -------------
.../javaApiOperators/lambdas/CrossITCase.java | 73 -------------
.../javaApiOperators/lambdas/FilterITCase.java | 91 ----------------
.../lambdas/FlatJoinITCase.java | 68 ------------
.../javaApiOperators/lambdas/FlatMapITCase.java | 56 ----------
.../lambdas/GroupReduceITCase.java | 69 ------------
.../javaApiOperators/lambdas/JoinITCase.java | 69 ------------
.../javaApiOperators/lambdas/MapITCase.java | 74 -------------
.../javaApiOperators/lambdas/ReduceITCase.java | 109 -------------------
20 files changed, 742 insertions(+), 742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
new file mode 100644
index 0000000..cee34af
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/AllGroupReduceITCase.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda allreduce functions.
+ */
+public class AllGroupReduceITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "aaabacad\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+ DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
+ String conc = "";
+ for (String s : values) {
+ conc = conc.concat(s);
+ }
+ out.collect(conc);
+ });
+ concatDs.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
new file mode 100644
index 0000000..a70f37a
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CoGroupITCase.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda cogroup functions.
+ */
+public class CoGroupITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "6\n3\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
+ .with((values1, values2, out) -> {
+ int sum = 0;
+ for (Tuple2<Integer, String> next : values1) {
+ sum += next.f0;
+ }
+ for (Tuple2<Integer, String> next : values2) {
+ sum += next.f0;
+ }
+ out.collect(sum);
+ });
+ joined.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
new file mode 100644
index 0000000..32cd910
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/CrossITCase.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda cross functions.
+ */
+public class CrossITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "2,hello not\n" +
+ "3,what's not\n" +
+ "3,up not\n" +
+ "2,hello much\n" +
+ "3,what's much\n" +
+ "3,up much\n" +
+ "3,hello really\n" +
+ "4,what's really\n" +
+ "4,up really";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer, String>> joined = left.cross(right)
+ .with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
+ joined.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
new file mode 100644
index 0000000..345b119
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FilterITCase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for lambda filter funtions.
+ */
+public class FilterITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n";
+
+ public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
+
+ List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
+ data.add(new Tuple3<>(1, 1L, "Hi"));
+ data.add(new Tuple3<>(2, 2L, "Hello"));
+ data.add(new Tuple3<>(3, 2L, "Hello world"));
+ data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
+ data.add(new Tuple3<>(5, 3L, "I am fine."));
+ data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
+ data.add(new Tuple3<>(7, 4L, "Comment#1"));
+ data.add(new Tuple3<>(8, 4L, "Comment#2"));
+ data.add(new Tuple3<>(9, 4L, "Comment#3"));
+ data.add(new Tuple3<>(10, 4L, "Comment#4"));
+ data.add(new Tuple3<>(11, 5L, "Comment#5"));
+ data.add(new Tuple3<>(12, 5L, "Comment#6"));
+ data.add(new Tuple3<>(13, 5L, "Comment#7"));
+ data.add(new Tuple3<>(14, 5L, "Comment#8"));
+ data.add(new Tuple3<>(15, 5L, "Comment#9"));
+ data.add(new Tuple3<>(16, 6L, "Comment#10"));
+ data.add(new Tuple3<>(17, 6L, "Comment#11"));
+ data.add(new Tuple3<>(18, 6L, "Comment#12"));
+ data.add(new Tuple3<>(19, 6L, "Comment#13"));
+ data.add(new Tuple3<>(20, 6L, "Comment#14"));
+ data.add(new Tuple3<>(21, 6L, "Comment#15"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(value -> value.f2.contains("world"));
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
new file mode 100644
index 0000000..f793450
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatJoinITCase.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda join functions.
+ */
+public class FlatJoinITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "2,what's really\n" +
+ "2,up really\n" +
+ "1,hello not\n" +
+ "1,hello much\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
+ .with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
+ joined.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
new file mode 100644
index 0000000..d395d7d
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/FlatMapITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda flatmap functions.
+ */
+public class FlatMapITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "bb\n" +
+ "bb\n" +
+ "bc\n" +
+ "bd\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+ DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
+ flatMappedDs.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
new file mode 100644
index 0000000..53db541
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/GroupReduceITCase.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda groupreduce functions.
+ */
+public class GroupReduceITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "abad\n" +
+ "aaac\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
+ new Tuple2<>(1, "aa"),
+ new Tuple2<>(2, "ab"),
+ new Tuple2<>(1, "ac"),
+ new Tuple2<>(2, "ad")
+ );
+ DataSet<String> concatDs = stringDs
+ .groupBy(0)
+ .reduceGroup((values, out) -> {
+ String conc = "";
+ for (Tuple2<Integer, String> next : values) {
+ conc = conc.concat(next.f1);
+ }
+ out.collect(conc);
+ });
+ concatDs.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
new file mode 100644
index 0000000..d86ea49
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/JoinITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda join functions.
+ */
+public class JoinITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "2,what's really\n" +
+ "2,up really\n" +
+ "1,hello not\n" +
+ "1,hello much\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
+ .with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
+ joined.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
new file mode 100644
index 0000000..15a9b9d
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/MapITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * IT cases for lambda map functions.
+ */
+public class MapITCase extends JavaProgramTestBase {
+
+ private static class Trade {
+
+ public String v;
+
+ public Trade(String v) {
+ this.v = v;
+ }
+
+ @Override
+ public String toString() {
+ return v;
+ }
+ }
+
+ private static final String EXPECTED_RESULT = "22\n" +
+ "22\n" +
+ "23\n" +
+ "24\n";
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
+ DataSet<String> mappedDs = stringDs
+ .map(Object::toString)
+ .map (s -> s.replace("1", "2"))
+ .map(Trade::new)
+ .map(Trade::toString);
+ mappedDs.writeAsText(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
new file mode 100644
index 0000000..712132c
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/test/api/java/operators/lambdas/ReduceITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.api.java.operators.lambdas;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for lambda reduce functions.
+ */
+public class ReduceITCase extends JavaProgramTestBase {
+
+ private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+
+ public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
+
+ List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
+ data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+ data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+ data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+ data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+ data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+ data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+ data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+ data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+ data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+ data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+ data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+ data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+ data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+ data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+ data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+
+ Collections.shuffle(data);
+
+ TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new
+ TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+ .groupBy(4, 0)
+ .reduce((in1, in2) -> {
+ Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+ out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+ return out;
+ });
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
deleted file mode 100644
index 7dada56..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/AllGroupReduceITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda allreduce functions.
- */
-public class AllGroupReduceITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "aaabacad\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
- DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
- String conc = "";
- for (String s : values) {
- conc = conc.concat(s);
- }
- out.collect(conc);
- });
- concatDs.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
deleted file mode 100644
index 30362cb..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cogroup functions.
- */
-public class CoGroupITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "6\n3\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
- DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
- DataSet<Integer> joined = left.coGroup(right).where(0).equalTo(0)
- .with((values1, values2, out) -> {
- int sum = 0;
- for (Tuple2<Integer, String> next : values1) {
- sum += next.f0;
- }
- for (Tuple2<Integer, String> next : values2) {
- sum += next.f0;
- }
- out.collect(sum);
- });
- joined.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
deleted file mode 100644
index 11a360e..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda cross functions.
- */
-public class CrossITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "2,hello not\n" +
- "3,what's not\n" +
- "3,up not\n" +
- "2,hello much\n" +
- "3,what's much\n" +
- "3,up much\n" +
- "3,hello really\n" +
- "4,what's really\n" +
- "4,up really";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
- DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
- DataSet<Tuple2<Integer, String>> joined = left.cross(right)
- .with((t, s) -> new Tuple2<> (t.f0 + s.f0, t.f1 + " " + s.f1));
- joined.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
deleted file mode 100644
index e6bef71..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda filter funtions.
- */
-public class FilterITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n";
-
- public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
-
- List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
- data.add(new Tuple3<>(1, 1L, "Hi"));
- data.add(new Tuple3<>(2, 2L, "Hello"));
- data.add(new Tuple3<>(3, 2L, "Hello world"));
- data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
- data.add(new Tuple3<>(5, 3L, "I am fine."));
- data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
- data.add(new Tuple3<>(7, 4L, "Comment#1"));
- data.add(new Tuple3<>(8, 4L, "Comment#2"));
- data.add(new Tuple3<>(9, 4L, "Comment#3"));
- data.add(new Tuple3<>(10, 4L, "Comment#4"));
- data.add(new Tuple3<>(11, 5L, "Comment#5"));
- data.add(new Tuple3<>(12, 5L, "Comment#6"));
- data.add(new Tuple3<>(13, 5L, "Comment#7"));
- data.add(new Tuple3<>(14, 5L, "Comment#8"));
- data.add(new Tuple3<>(15, 5L, "Comment#9"));
- data.add(new Tuple3<>(16, 6L, "Comment#10"));
- data.add(new Tuple3<>(17, 6L, "Comment#11"));
- data.add(new Tuple3<>(18, 6L, "Comment#12"));
- data.add(new Tuple3<>(19, 6L, "Comment#13"));
- data.add(new Tuple3<>(20, 6L, "Comment#14"));
- data.add(new Tuple3<>(21, 6L, "Comment#15"));
-
- Collections.shuffle(data);
-
- return env.fromCollection(data);
- }
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(value -> value.f2.contains("world"));
- filterDs.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
deleted file mode 100644
index e35278d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class FlatJoinITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "2,what's really\n" +
- "2,up really\n" +
- "1,hello not\n" +
- "1,hello much\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
- DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
- DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
- .with((t, s, out) -> out.collect(new Tuple2<Integer, String>(t.f0, t.f1 + " " + s.f1)));
- joined.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
deleted file mode 100644
index b5211c8..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda flatmap functions.
- */
-public class FlatMapITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "bb\n" +
- "bb\n" +
- "bc\n" +
- "bd\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
- DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
- flatMappedDs.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
deleted file mode 100644
index 61061b8..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda groupreduce functions.
- */
-public class GroupReduceITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "abad\n" +
- "aaac\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> stringDs = env.fromElements(
- new Tuple2<>(1, "aa"),
- new Tuple2<>(2, "ab"),
- new Tuple2<>(1, "ac"),
- new Tuple2<>(2, "ad")
- );
- DataSet<String> concatDs = stringDs
- .groupBy(0)
- .reduceGroup((values, out) -> {
- String conc = "";
- for (Tuple2<Integer, String> next : values) {
- conc = conc.concat(next.f1);
- }
- out.collect(conc);
- });
- concatDs.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
deleted file mode 100644
index 0e78212..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda join functions.
- */
-public class JoinITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "2,what's really\n" +
- "2,up really\n" +
- "1,hello not\n" +
- "1,hello much\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Integer, String>> left = env.fromElements(
- new Tuple2<Integer, String>(1, "hello"),
- new Tuple2<Integer, String>(2, "what's"),
- new Tuple2<Integer, String>(2, "up")
- );
- DataSet<Tuple2<Integer, String>> right = env.fromElements(
- new Tuple2<Integer, String>(1, "not"),
- new Tuple2<Integer, String>(1, "much"),
- new Tuple2<Integer, String>(2, "really")
- );
- DataSet<Tuple2<Integer, String>> joined = left.join(right).where(0).equalTo(0)
- .with((t, s) -> new Tuple2<>(t.f0, t.f1 + " " + s.f1));
- joined.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
deleted file mode 100644
index 355d38d..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * IT cases for lambda map functions.
- */
-public class MapITCase extends JavaProgramTestBase {
-
- private static class Trade {
-
- public String v;
-
- public Trade(String v) {
- this.v = v;
- }
-
- @Override
- public String toString() {
- return v;
- }
- }
-
- private static final String EXPECTED_RESULT = "22\n" +
- "22\n" +
- "23\n" +
- "24\n";
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
- DataSet<String> mappedDs = stringDs
- .map(Object::toString)
- .map (s -> s.replace("1", "2"))
- .map(Trade::new)
- .map(Trade::toString);
- mappedDs.writeAsText(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7355a59f/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
deleted file mode 100644
index 24b39d7..0000000
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.lambdas;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * IT cases for lambda reduce functions.
- */
-public class ReduceITCase extends JavaProgramTestBase {
-
- private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
-
- public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
- List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
- data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
- data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
- data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
- data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
- data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
- data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
- data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
- data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
- data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
- data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
- data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
- data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
- data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
- data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
- data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-
- Collections.shuffle(data);
-
- TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new
- TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO
- );
-
- return env.fromCollection(data, type);
- }
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
- .groupBy(4, 0)
- .reduce((in1, in2) -> {
- Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
- out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
- return out;
- });
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
- }
-}