You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:07 UTC
[71/82] [abbrv] incubator-flink git commit: Change integration tests
to reuse cluster in order to save startup and shutdown time.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
index 7bde1a4..aa75836 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
@@ -18,35 +18,18 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-@RunWith(Parameterized.class)
public class ProjectITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 1;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+
private String resultPath;
private String expectedResult;
- public ProjectITCase(Configuration config) {
- super(config);
- }
-
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
@@ -54,72 +37,37 @@ public class ProjectITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- expectedResult = ProjectProgs.runProgram(curProgId, resultPath);
+ /*
+ * Projection with tuple fields indexes
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<String, Long, Integer>> projDs = ds.
+ project(3,4,2);
+ projDs.writeAsCsv(resultPath);
+
+ env.execute();
+ expectedResult = "Hallo,1,0\n" +
+ "Hallo Welt,2,1\n" +
+ "Hallo Welt wie,1,2\n" +
+ "Hallo Welt wie gehts?,2,3\n" +
+ "ABC,2,4\n" +
+ "BCD,3,5\n" +
+ "CDE,2,6\n" +
+ "DEF,1,7\n" +
+ "EFG,1,8\n" +
+ "FGH,2,9\n" +
+ "GHI,1,10\n" +
+ "HIJ,3,11\n" +
+ "IJK,3,12\n" +
+ "JKL,2,13\n" +
+ "KLM,2,14\n";
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(expectedResult, resultPath);
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
- }
-
-
- private static class ProjectProgs {
-
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Projection with tuple fields indexes
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple3<String, Long, Integer>> projDs = ds.
- project(3,4,2);
- projDs.writeAsCsv(resultPath);
-
- env.execute();
- return "Hallo,1,0\n" +
- "Hallo Welt,2,1\n" +
- "Hallo Welt wie,1,2\n" +
- "Hallo Welt wie gehts?,2,3\n" +
- "ABC,2,4\n" +
- "BCD,3,5\n" +
- "CDE,2,6\n" +
- "DEF,1,7\n" +
- "EFG,1,8\n" +
- "FGH,2,9\n" +
- "GHI,1,10\n" +
- "HIJ,3,11\n" +
- "IJK,3,12\n" +
- "JKL,2,13\n" +
- "KLM,2,14\n";
-
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
-
- }
-
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 1fcacb9..065be67 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -18,11 +18,8 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.Collection;
import java.util.Date;
-import java.util.LinkedList;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -38,371 +35,362 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
-public class ReduceITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 11;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class ReduceITCase extends MultipleProgramsTestBase {
+
+ public ReduceITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public ReduceITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ @Test
+ public void testReduceOnTuplesWithKeyFieldSelector() throws Exception {
+ /*
+ * Reduce on tuples with key field selector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ groupBy(1).reduce(new Tuple3Reduce("B-)"));
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "5,2,B-)\n" +
+ "15,3,B-)\n" +
+ "34,4,B-)\n" +
+ "65,5,B-)\n" +
+ "111,6,B-)\n";
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = ReduceProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testReduceOnTupleWithMultipleKeyFieldSelectors() throws Exception{
+ /*
+ * Reduce on tuples with multiple key field selectors
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+ groupBy(4,0).reduce(new Tuple5Reduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ @Test
+ public void testReduceOnTuplesWithKeyExtractor() throws Exception {
+ /*
+ * Reduce on tuples with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ groupBy(new KeySelector1()).reduce(new Tuple3Reduce("B-)"));
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "5,2,B-)\n" +
+ "15,3,B-)\n" +
+ "34,4,B-)\n" +
+ "65,5,B-)\n" +
+ "111,6,B-)\n";
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Long getKey(Tuple3<Integer, Long, String> in) {
+ return in.f1;
+ }
+ }
+
+ @Test
+ public void testReduceOnCustomTypeWithKeyExtractor() throws Exception {
+ /*
+ * Reduce on custom type with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> reduceDs = ds.
+ groupBy(new KeySelector2()).reduce(new CustomTypeReduce());
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
+ expected = "1,0,Hi\n" +
+ "2,3,Hello!\n" +
+ "3,12,Hello!\n" +
+ "4,30,Hello!\n" +
+ "5,60,Hello!\n" +
+ "6,105,Hello!\n";
+ }
+
+ public static class KeySelector2 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
}
-
- return toParameterList(tConfigs);
}
-
- private static class ReduceProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Reduce on tuples with key field selector
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).reduce(new Tuple3Reduce("B-)"));
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "5,2,B-)\n" +
- "15,3,B-)\n" +
- "34,4,B-)\n" +
- "65,5,B-)\n" +
- "111,6,B-)\n";
- }
- case 2: {
- /*
- * Reduce on tuples with multiple key field selectors
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
- groupBy(4,0).reduce(new Tuple5Reduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
- }
- case 3: {
- /*
- * Reduce on tuples with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(new KeySelector<Tuple3<Integer,Long,String>, Long>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Long getKey(Tuple3<Integer, Long, String> in) {
- return in.f1;
- }
- }).reduce(new Tuple3Reduce("B-)"));
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "5,2,B-)\n" +
- "15,3,B-)\n" +
- "34,4,B-)\n" +
- "65,5,B-)\n" +
- "111,6,B-)\n";
-
- }
- case 4: {
- /*
- * Reduce on custom type with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> reduceDs = ds.
- groupBy(new KeySelector<CustomType, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- }).reduce(new CustomTypeReduce());
-
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,Hi\n" +
- "2,3,Hello!\n" +
- "3,12,Hello!\n" +
- "4,30,Hello!\n" +
- "5,60,Hello!\n" +
- "6,105,Hello!\n";
- }
- case 5: {
- /*
- * All-reduce for tuple
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- reduce(new AllAddingTuple3Reduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "231,91,Hello World\n";
- }
- case 6: {
- /*
- * All-reduce for custom types
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> reduceDs = ds.
- reduce(new AllAddingCustomTypeReduce());
-
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "91,210,Hello!";
- }
- case 7: {
-
- /*
- * Reduce with broadcast set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "5,2,55\n" +
- "15,3,55\n" +
- "34,4,55\n" +
- "65,5,55\n" +
- "111,6,55\n";
- }
- case 8: {
- /*
- * Reduce with UDF that returns the second input object (check mutable object handling)
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).reduce(new InputReturningTuple3Reduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "5,2,Hi again!\n" +
- "15,3,Hi again!\n" +
- "34,4,Hi again!\n" +
- "65,5,Hi again!\n" +
- "111,6,Hi again!\n";
- }
- case 9: {
- /*
- * Reduce with a Tuple-returning KeySelector
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds .
- groupBy(
- new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
- return new Tuple2<Integer, Long>(t.f0, t.f4);
- }
- }).reduce(new Tuple5Reduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- return "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
- }
- case 10: {
- /*
- * Case 2 with String-based field expression
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
- groupBy("f4","f0").reduce(new Tuple5Reduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
- }
- case 11: {
- /**
- * Test support for Date and enum serialization
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0,2).map(new MapFunction<Long, PojoWithDateAndEnum>() {
- @Override
- public PojoWithDateAndEnum map(Long value) throws Exception {
- int l = value.intValue();
- switch (l) {
- case 0:
- PojoWithDateAndEnum one = new PojoWithDateAndEnum();
- one.group = "a";
- one.date = new Date(666);
- one.cat = CollectionDataSets.Category.CAT_A;
- return one;
- case 1:
- PojoWithDateAndEnum two = new PojoWithDateAndEnum();
- two.group = "a";
- two.date = new Date(666);
- two.cat = CollectionDataSets.Category.CAT_A;
- return two;
- case 2:
- PojoWithDateAndEnum three = new PojoWithDateAndEnum();
- three.group = "b";
- three.date = new Date(666);
- three.cat = CollectionDataSets.Category.CAT_B;
- return three;
- }
- throw new RuntimeException("Unexpected value for l=" + l);
- }
- });
- ds = ds.union(CollectionDataSets.getPojoWithDateAndEnum(env));
-
- DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<PojoWithDateAndEnum> values,
- Collector<String> out) throws Exception {
- for(PojoWithDateAndEnum val : values) {
- if(val.cat == CollectionDataSets.Category.CAT_A) {
- Assert.assertEquals("a", val.group);
- } else if(val.cat == CollectionDataSets.Category.CAT_B) {
- Assert.assertEquals("b", val.group);
- } else {
- Assert.fail("error. Cat = "+val.cat);
- }
- Assert.assertEquals(666, val.date.getTime());
- }
- out.collect("ok");
- }
- });
-
- res.writeAsText(resultPath);
- env.execute();
- return "ok\nok";
+
+ @Test
+ public void testAllReduceForTuple() throws Exception {
+ /*
+ * All-reduce for tuple
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ reduce(new AllAddingTuple3Reduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "231,91,Hello World\n";
+ }
+
+ @Test
+ public void testAllReduceForCustomTypes() throws Exception {
+ /*
+ * All-reduce for custom types
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> reduceDs = ds.
+ reduce(new AllAddingCustomTypeReduce());
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "91,210,Hello!";
+ }
+
+ @Test
+ public void testReduceWithBroadcastSet() throws Exception {
+ /*
+ * Reduce with broadcast set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "5,2,55\n" +
+ "15,3,55\n" +
+ "34,4,55\n" +
+ "65,5,55\n" +
+ "111,6,55\n";
+ }
+
+ @Test
+ public void testReduceWithUDFThatReturnsTheSecondInputObject() throws Exception {
+ /*
+ * Reduce with UDF that returns the second input object (check mutable object handling)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ groupBy(1).reduce(new InputReturningTuple3Reduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "5,2,Hi again!\n" +
+ "15,3,Hi again!\n" +
+ "34,4,Hi again!\n" +
+ "65,5,Hi again!\n" +
+ "111,6,Hi again!\n";
+ }
+
+ @Test
+ public void testReduceATupleReturningKeySelector() throws Exception {
+ /*
+ * Reduce with a Tuple-returning KeySelector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds .
+ groupBy(new KeySelector3()).reduce(new Tuple5Reduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+ }
+
+ public static class KeySelector3 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }
+
+ @Test
+ public void testReduceOnTupleWithMultipleKeyExpressions() throws Exception {
+ /*
+ * Case 2 with String-based field expression
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+ groupBy("f4","f0").reduce(new Tuple5Reduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+ }
+
+ @Test
+ public void testSupportForDataAndEnumSerialization() throws Exception {
+ /**
+ * Test support for Date and enum serialization
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0,2).map(new Mapper1());
+ ds = ds.union(CollectionDataSets.getPojoWithDateAndEnum(env));
+
+ DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReducer1());
+
+ res.writeAsText(resultPath);
+ env.execute();
+ expected = "ok\nok";
+ }
+
+ public static class Mapper1 implements MapFunction<Long, PojoWithDateAndEnum> {
+ @Override
+ public PojoWithDateAndEnum map(Long value) throws Exception {
+ int l = value.intValue();
+ switch (l) {
+ case 0:
+ PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+ one.group = "a";
+ one.date = new Date(666);
+ one.cat = CollectionDataSets.Category.CAT_A;
+ return one;
+ case 1:
+ PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+ two.group = "a";
+ two.date = new Date(666);
+ two.cat = CollectionDataSets.Category.CAT_A;
+ return two;
+ case 2:
+ PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+ three.group = "b";
+ three.date = new Date(666);
+ three.cat = CollectionDataSets.Category.CAT_B;
+ return three;
}
-
- default:
- throw new IllegalArgumentException("Invalid program id");
+ throw new RuntimeException("Unexpected value for l=" + l);
+ }
+ }
+
+ public static class GroupReducer1 implements GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterable<PojoWithDateAndEnum> values,
+ Collector<String> out) throws Exception {
+ for(PojoWithDateAndEnum val : values) {
+ if(val.cat == CollectionDataSets.Category.CAT_A) {
+ Assert.assertEquals("a", val.group);
+ } else if(val.cat == CollectionDataSets.Category.CAT_B) {
+ Assert.assertEquals("b", val.group);
+ } else {
+ Assert.fail("error. Cat = "+val.cat);
+ }
+ Assert.assertEquals(666, val.date.getTime());
}
-
+ out.collect("ok");
}
-
}
public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
index d63d08c..ee12fa4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
@@ -23,128 +23,96 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
@RunWith(Parameterized.class)
-public class SumMinMaxITCase extends JavaProgramTestBase {
+public class SumMinMaxITCase extends MultipleProgramsTestBase {
- private static int NUM_PROGRAMS = 3;
+ public SumMinMaxITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
- private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
- private String expectedResult;
+ private String expected;
- public SumMinMaxITCase(Configuration config) {
- super(config);
- }
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = SumMinMaxProgs.runProgram(curProgId, resultPath);
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+ @Test
+ public void testSumMaxAndProject() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> sumDs = ds
+ .sum(0)
+ .andMax(1)
+ .project(0, 1);
+
+ sumDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "231,6\n";
}
- @Parameterized.Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+ @Test
+ public void testGroupedAggregate() throws Exception {
+ /*
+ * Grouped Aggregate
+ */
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+ .sum(0)
+ .project(1, 0);
- return toParameterList(tConfigs);
+ aggregateDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1\n" +
+ "2,5\n" +
+ "3,15\n" +
+ "4,34\n" +
+ "5,65\n" +
+ "6,111\n";
}
- /**
- * These tests are copied from
- * @see org.apache.flink.test.javaApiOperators.AggregateITCase
- * replacing calls to aggregate with calls to sum, min, and max
- */
- private static class SumMinMaxProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
- switch(progId) {
- case 1: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Integer, Long>> sumDs = ds
- .sum(0)
- .andMax(1)
- .project(0, 1);
-
- sumDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "231,6\n";
- }
- case 2: {
- /*
- * Grouped Aggregate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
- .sum(0)
- .project(1, 0);
-
- aggregateDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1\n" +
- "2,5\n" +
- "3,15\n" +
- "4,34\n" +
- "5,65\n" +
- "6,111\n";
- }
- case 3: {
- /*
- * Nested Aggregate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
- .min(0)
- .min(0)
- .project(0);
-
- aggregateDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1\n";
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
- }
+ @Test
+ public void testNestedAggregate() throws Exception {
+ /*
+ * Nested Aggregate
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+ .min(0)
+ .min(0)
+ .project(0);
+
+ aggregateDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1\n";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
index 4dabc62..0a23da8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
@@ -18,157 +18,129 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
-public class UnionITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 3;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class UnionITCase extends MultipleProgramsTestBase {
+
+ private static final String FULL_TUPLE_3_STRING = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" +
+ "5,3,I am fine.\n" +
+ "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" +
+ "8,4,Comment#2\n" +
+ "9,4,Comment#3\n" +
+ "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" +
+ "12,5,Comment#6\n" +
+ "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" +
+ "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" +
+ "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" +
+ "21,6,Comment#15\n";
+
+ public UnionITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
+ private String expected;
- public UnionITCase(Configuration config) {
- super(config);
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = UnionProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testUnion2IdenticalDataSets() throws Exception {
+ /*
+ * Union of 2 Same Data Sets
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env));
+
+ unionDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ @Test
+ public void testUnion5IdenticalDataSets() throws Exception {
+ /*
+ * Union of 5 same Data Sets, with multiple unions
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
+ .union(CollectionDataSets.get3TupleDataSet(env))
+ .union(CollectionDataSets.get3TupleDataSet(env))
+ .union(CollectionDataSets.get3TupleDataSet(env));
+
+ unionDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
+ FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ @Test
+ public void testUnionWithEmptyDataSet() throws Exception {
+ /*
+ * Test on union with empty dataset
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
+ // Don't know how to make an empty result in an other way than filtering it
+ DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env).
+ filter(new RichFilter1());
+
+ DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env)
+ .union(empty);
+
+ unionDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = FULL_TUPLE_3_STRING;
}
-
- private static class UnionProgs {
-
- private static final String FULL_TUPLE_3_STRING = "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" +
- "5,3,I am fine.\n" +
- "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" +
- "8,4,Comment#2\n" +
- "9,4,Comment#3\n" +
- "10,4,Comment#4\n" +
- "11,5,Comment#5\n" +
- "12,5,Comment#6\n" +
- "13,5,Comment#7\n" +
- "14,5,Comment#8\n" +
- "15,5,Comment#9\n" +
- "16,6,Comment#10\n" +
- "17,6,Comment#11\n" +
- "18,6,Comment#12\n" +
- "19,6,Comment#13\n" +
- "20,6,Comment#14\n" +
- "21,6,Comment#15\n";
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Union of 2 Same Data Sets
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env));
-
- unionDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
- }
- case 2: {
- /*
- * Union of 5 same Data Sets, with multiple unions
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env));
-
- unionDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
- }
- case 3: {
- /*
- * Test on union with empty dataset
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // Don't know how to make an empty result in an other way than filtering it
- DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env).
- filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
- return false;
- }
- });
-
- DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env)
- .union(empty);
-
- unionDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return FULL_TUPLE_3_STRING;
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
-
+
+ public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+ return false;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index 0d6b763..ba66695 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -19,8 +19,12 @@ package org.apache.flink.api.scala.functions
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
import org.junit.Assert.fail
+import org.junit.{After, Before, Test, Rule}
+import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
@@ -34,79 +38,75 @@ import org.apache.flink.api.common.InvalidProgramException
/* The test cases are originally from the Apache Spark project. Like the ClosureCleaner itself. */
@RunWith(classOf[Parameterized])
-class ClosureCleanerITCase(config: Configuration) extends JavaProgramTestBase(config) {
+class ClosureCleanerITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
+ val _tempFolder = new TemporaryFolder()
+ var resultPath: String = _
+ var result: String = _
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
- }
-
- protected def testProgram(): Unit = {
- expectedResult = curProgId match {
- case 1 =>
- TestObject.run(resultPath)
- "30" // 6 + 7 + 8 + 9
-
- case 2 =>
- val obj = new TestClass
- obj.run(resultPath)
- "30" // 6 + 7 + 8 + 9
-
- case 3 =>
- val obj = new TestClassWithoutDefaultConstructor(5)
- obj.run(resultPath)
- "30" // 6 + 7 + 8 + 9
+ @Rule
+ def tempFolder = _tempFolder
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
- case 4 =>
- val obj = new TestClassWithoutFieldAccess
- obj.run(resultPath)
- "30" // 6 + 7 + 8 + 9
-
- case 5 =>
- TestObjectWithNesting.run(resultPath)
- "27"
-
- case 6 =>
- val obj = new TestClassWithNesting(1)
- obj.run(resultPath)
- "27"
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(result, resultPath)
+ }
- case 7 =>
- TestObjectWithBogusReturns.run(resultPath)
- "1"
+ @Test
+ def testObject: Unit = {
+ TestObject.run(resultPath)
+ result = "30"
+ }
- case 8 =>
- TestObjectWithNestedReturns.run(resultPath)
- "1"
+ @Test
+ def testClass: Unit = {
+ val obj = new TestClass
+ obj.run(resultPath)
+ result = "30"
+ }
- case _ =>
- throw new IllegalArgumentException("Invalid program id")
- }
+ @Test
+ def testClassWithoutDefaulConstructor: Unit = {
+ val obj = new TestClassWithoutDefaultConstructor(5)
+ obj.run(resultPath)
+ result = "30"
}
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
+ @Test
+ def testClassWithoutFieldAccess: Unit = {
+ val obj = new TestClassWithoutFieldAccess
+ obj.run(resultPath)
+ result = "30" // 6 + 7 + 8 + 9
}
-}
-object ClosureCleanerITCase {
+ @Test
+ def testObjectWithNesting: Unit = {
+ TestObjectWithNesting.run(resultPath)
+ result = "27"
+ }
- val NUM_PROGRAMS = 6
+ @Test
+ def testClassWithNesting: Unit = {
+ val obj = new TestClassWithNesting(1)
+ obj.run(resultPath)
+ result = "27"
+ }
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
- }
+ @Test
+ def testObjectWithBogusReturns: Unit = {
+ TestObjectWithBogusReturns.run(resultPath)
+ result = "1"
+ }
- configs.asJavaCollection
+ @Test
+ def testObjectWithNestedReturns: Unit = {
+ TestObjectWithNestedReturns.run(resultPath)
+ result = "1"
}
}
@@ -121,7 +121,7 @@ object TestObject {
val env = ExecutionEnvironment.getExecutionEnvironment
val nums = env.fromElements(1, 2, 3, 4)
- nums.map(_ + x).reduce(_ + _).writeAsText(resultPath)
+ nums.map(_ + x).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
}
@@ -138,7 +138,7 @@ class TestClass extends Serializable {
val env = ExecutionEnvironment.getExecutionEnvironment
val nums = env.fromElements(1, 2, 3, 4)
- nums.map(_ + getX).reduce(_ + _).writeAsText(resultPath)
+ nums.map(_ + getX).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
}
@@ -153,7 +153,7 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
val env = ExecutionEnvironment.getExecutionEnvironment
val nums = env.fromElements(1, 2, 3, 4)
- nums.map(_ + getX).reduce(_ + _).writeAsText(resultPath)
+ nums.map(_ + getX).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
}
@@ -171,7 +171,7 @@ class TestClassWithoutFieldAccess {
val env = ExecutionEnvironment.getExecutionEnvironment
val nums = env.fromElements(1, 2, 3, 4)
- nums.map(_ + x).reduce(_ + _).writeAsText(resultPath)
+ nums.map(_ + x).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
}
@@ -193,7 +193,7 @@ object TestObjectWithBogusReturns {
case _ => fail("Bogus return statement not detected.")
}
- nums.writeAsText(resultPath)
+ nums.writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
}
@@ -213,7 +213,7 @@ object TestObjectWithNestedReturns {
foo()
}
- nums.writeAsText(resultPath)
+ nums.writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
}
@@ -235,7 +235,7 @@ object TestObjectWithNesting {
in.map(_ + x + y).reduce(_ + _).withBroadcastSet(nums, "nums")
}
- result.writeAsText(resultPath)
+ result.writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
}
@@ -259,7 +259,7 @@ class TestClassWithNesting(val y: Int) extends Serializable {
in.map(_ + x + getY).reduce(_ + _).withBroadcastSet(nums, "nums")
}
- result.writeAsText(resultPath)
+ result.writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 75e8a66..ae3512a 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -20,121 +20,97 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.flink.api.scala._
+@RunWith(classOf[Parameterized])
+class AggregateITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
-object AggregateProgs {
- var NUM_PROGRAMS: Int = 3
-
- def runProgram(progId: Int, resultPath: String): String = {
- progId match {
- case 1 =>
- // Full aggregate
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
-
- val aggregateDs = ds
- .aggregate(Aggregations.SUM,0)
- .and(Aggregations.MAX, 1)
- // Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
- .map{ t => (t._1, t._2) }
+ val _tempFolder = new TemporaryFolder()
- aggregateDs.writeAsCsv(resultPath)
+ private var resultPath: String = null
+ private var expectedResult: String = null
- env.execute()
+ @Rule
+ def tempFolder = _tempFolder
- // return expected result
- "231,6\n"
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
- case 2 =>
- // Grouped aggregate
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
+ @After
+ def after(): Unit = {
+ compareResultsByLinesInMemory(expectedResult, resultPath)
+ }
- val aggregateDs = ds
- .groupBy(1)
- .aggregate(Aggregations.SUM, 0)
- // Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
- .map { t => (t._2, t._1) }
+ @Test
+ def testFullAggregate: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
- aggregateDs.writeAsCsv(resultPath)
+ val ds = CollectionDataSets.get3TupleDataSet(env)
- env.execute()
+ val aggregateDs = ds
+ .aggregate(Aggregations.SUM,0)
+ .and(Aggregations.MAX, 1)
+ // Ensure aggregate operator correctly copies other fields
+ .filter(_._3 != null)
+ .map{ t => (t._1, t._2) }
- // return expected result
- "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+ aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- case 3 =>
- // Nested aggregate
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
+ env.execute()
- val aggregateDs = ds
- .groupBy(1)
- .aggregate(Aggregations.MIN, 0)
- .aggregate(Aggregations.MIN, 0)
- // Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
- .map { t => new Tuple1(t._1) }
+ // return expected result
+ expectedResult = "231,6\n"
+ }
- aggregateDs.writeAsCsv(resultPath)
+ @Test
+ def testGroupedAggregate: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
- env.execute()
+ val aggregateDs = ds
+ .groupBy(1)
+ .aggregate(Aggregations.SUM, 0)
+ // Ensure aggregate operator correctly copies other fields
+ .filter(_._3 != null)
+ .map { t => (t._2, t._1) }
- // return expected result
- "1\n"
+ aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
- case _ =>
- throw new IllegalArgumentException("Invalid program id")
- }
+ // return expected result
+ expectedResult = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
}
-}
-
-@RunWith(classOf[Parameterized])
-class AggregateITCase(config: Configuration) extends JavaProgramTestBase(config) {
+ @Test
+ def testNestedAggregate: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
- private var curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
+ val aggregateDs = ds
+ .groupBy(1)
+ .aggregate(Aggregations.MIN, 0)
+ .aggregate(Aggregations.MIN, 0)
+ // Ensure aggregate operator correctly copies other fields
+ .filter(_._3 != null)
+ .map { t => new Tuple1(t._1) }
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
- }
+ aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- protected def testProgram(): Unit = {
- expectedResult = AggregateProgs.runProgram(curProgId, resultPath)
- }
-
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-}
+ env.execute()
-object AggregateITCase {
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to AggregateProgs.NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
- }
-
- configs.asJavaCollection
+ // return expected result
+ expectedResult = "1\n"
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 72e5648..93ade52 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -21,382 +21,367 @@ import org.apache.flink.api.common.functions.RichCoGroupFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.apache.flink.util.Collector
+import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-import org.junit.Assert
+import org.junit._
import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.flink.api.scala._
+@RunWith(classOf[Parameterized])
+class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+ val _tempFolder = new TemporaryFolder()
+ var resultPath: String = _
+ var expectedResult: String = _
-object CoGroupProgs {
- var NUM_PROGRAMS: Int = 13
+ @Rule
+ def tempFolder = _tempFolder
- def runProgram(progId: Int, resultPath: String): String = {
- progId match {
- case 1 =>
- /*
- * CoGroup on tuples with key field selector
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get5TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
- (first, second) =>
- var sum = 0
- var id = 0
- for (t <- first) {
- sum += t._3
- id = t._1
- }
- for (t <- second) {
- sum += t._3
- id = t._1
- }
- (id, sum)
- }
- coGroupDs.writeAsCsv(resultPath)
- env.execute()
- "1,0\n" + "2,6\n" + "3,24\n" + "4,60\n" + "5,120\n"
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
- case 2 =>
- /*
- * CoGroup on two custom type inputs with key extractors
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expectedResult, resultPath)
+ }
- val coGroupDs = ds.coGroup(ds2).where(_.myInt).equalTo(_.myInt) apply {
- (first, second) =>
- val o = new CustomType(0, 0, "test")
- for (c <- first) {
- o.myInt = c.myInt
- o.myLong += c.myLong
- }
- for (c <- second) {
- o.myInt = c.myInt
- o.myLong += c.myLong
- }
- o
+ @Test
+ def testCoGroupOnTuplesWithKeyFieldSelector: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get5TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+ (first, second) =>
+ var sum = 0
+ var id = 0
+ for (t <- first) {
+ sum += t._3
+ id = t._1
}
- coGroupDs.writeAsText(resultPath)
- env.execute()
- "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" + "6," +
- "210,test\n"
-
- case 3 =>
- /*
- * check correctness of cogroup if UDF returns left input objects
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val ds2 = CollectionDataSets.get3TupleDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
- (first, second, out: Collector[(Int, Long, String)] ) =>
- for (t <- first) {
- if (t._1 < 6) {
- out.collect(t)
- }
- }
+ for (t <- second) {
+ sum += t._3
+ id = t._1
}
- coGroupDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n"
+ (id, sum)
+ }
+ coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "1,0\n" + "2,6\n" + "3,24\n" + "4,60\n" + "5,120\n"
+ }
- case 4 =>
- /*
- * check correctness of cogroup if UDF returns right input objects
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get5TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
- (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
- for (t <- second) {
- if (t._1 < 4) {
- out.collect(t)
- }
- }
+ @Test
+ def testCoGroupOnTwoCustomInputsWithKeyExtractors: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getCustomTypeDataSet(env)
+ val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+
+ val coGroupDs = ds.coGroup(ds2).where(_.myInt).equalTo(_.myInt) apply {
+ (first, second) =>
+ val o = new CustomType(0, 0, "test")
+ for (c <- first) {
+ o.myInt = c.myInt
+ o.myLong += c.myLong
}
- coGroupDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie,1\n" + "3,4,3," +
- "Hallo Welt wie gehts?,2\n" + "3,5,4,ABC,2\n" + "3,6,5,BCD,3\n"
-
- case 5 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- val intDs = CollectionDataSets.getIntDataSet(env)
- val ds = CollectionDataSets.get5TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).apply(
- new RichCoGroupFunction[
- (Int, Long, Int, String, Long),
- (Int, Long, Int, String, Long),
- (Int, Int, Int)] {
- private var broadcast = 41
-
- override def open(config: Configuration) {
- val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
- broadcast = ints.sum
- }
-
- override def coGroup(
- first: java.lang.Iterable[(Int, Long, Int, String, Long)],
- second: java.lang.Iterable[(Int, Long, Int, String, Long)],
- out: Collector[(Int, Int, Int)]): Unit = {
- var sum = 0
- var id = 0
- for (t <- first.asScala) {
- sum += t._3
- id = t._1
- }
- for (t <- second.asScala) {
- sum += t._3
- id = t._1
- }
- out.collect((id, sum, broadcast))
- }
-
- }).withBroadcastSet(intDs, "ints")
- coGroupDs.writeAsCsv(resultPath)
- env.execute()
- "1,0,55\n" + "2,6,55\n" + "3,24,55\n" + "4,60,55\n" + "5,120,55\n"
-
- case 6 =>
- /*
- * CoGroup on a tuple input with key field selector and a custom type input with
- * key extractor
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get5TupleDataSet(env)
- val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where(2).equalTo(_.myInt) apply {
- (first, second) =>
- var sum = 0L
- var id = 0
- for (t <- first) {
- sum += t._1
- id = t._3
- }
- for (t <- second) {
- sum += t.myLong
- id = t.myInt
- }
- (id, sum, "test")
+ for (c <- second) {
+ o.myInt = c.myInt
+ o.myLong += c.myLong
}
- coGroupDs.writeAsCsv(resultPath)
- env.execute()
- "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" + "5," +
- "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
- "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
-
- case 7 =>
- /*
- * CoGroup on a tuple input with key field selector and a custom type input with
- * key extractor
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get5TupleDataSet(env)
- val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
- val coGroupDs = ds2.coGroup(ds).where(_.myInt).equalTo(2) {
- (first, second) =>
- var sum = 0L
- var id = 0
- for (t <- first) {
- sum += t.myLong
- id = t.myInt
- }
- for (t <- second) {
- sum += t._1
- id = t._3
- }
+ o
+ }
+ coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" +
+ "6," + "210,test\n"
+ }
- new CustomType(id, sum, "test")
+ @Test
+ def testCorrectnessIfCoGroupReturnsLeftInputObjects: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get3TupleDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+ (first, second, out: Collector[(Int, Long, String)] ) =>
+ for (t <- first) {
+ if (t._1 < 6) {
+ out.collect(t)
+ }
}
- coGroupDs.writeAsText(resultPath)
- env.execute()
- "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" + "5," +
- "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
- "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
+ }
+ coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n"
+ }
- case 8 =>
- /*
- * CoGroup with multiple key fields
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.get5TupleDataSet(env)
- val ds2 = CollectionDataSets.get3TupleDataSet(env)
- val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
- (first, second, out: Collector[(Int, Long, String)]) =>
- val strs = first map(_._4)
- for (t <- second) {
- for (s <- strs) {
- out.collect((t._1, t._2, s))
- }
- }
+ @Test
+ def testCorrectnessIfCoGroupReturnsRightInputObjects: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get5TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
+ (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
+ for (t <- second) {
+ if (t._1 < 4) {
+ out.collect(t)
+ }
}
+ }
+ coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "1,1,0,Hallo,1\n" + "2,2,1,Hallo Welt,2\n" + "2,3,2,Hallo Welt wie,1\n" +
+ "3,4,3," + "Hallo Welt wie gehts?,2\n" + "3,5,4,ABC,2\n" + "3,6,5,BCD,3\n"
+ }
- coGrouped.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
- "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
+ @Test
+ def testCoGroupWithBroadcastVariable: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val intDs = CollectionDataSets.getIntDataSet(env)
+ val ds = CollectionDataSets.get5TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).apply(
+ new RichCoGroupFunction[
+ (Int, Long, Int, String, Long),
+ (Int, Long, Int, String, Long),
+ (Int, Int, Int)] {
+ private var broadcast = 41
+
+ override def open(config: Configuration) {
+ val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+ broadcast = ints.sum
+ }
- case 9 =>
- /*
- * CoGroup with multiple key fields
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets
- .get5TupleDataSet(env)
- val ds2 = CollectionDataSets.get3TupleDataSet(env)
- val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2))
- .apply {
- (first, second, out: Collector[(Int, Long, String)]) =>
- val strs = first map(_._4)
- for (t <- second) {
- for (s <- strs) {
- out.collect((t._1, t._2, s))
- }
- }
+ override def coGroup(
+ first: java.lang.Iterable[(Int, Long, Int, String, Long)],
+ second: java.lang.Iterable[(Int, Long, Int, String, Long)],
+ out: Collector[(Int, Int, Int)]): Unit = {
+ var sum = 0
+ var id = 0
+ for (t <- first.asScala) {
+ sum += t._3
+ id = t._1
+ }
+ for (t <- second.asScala) {
+ sum += t._3
+ id = t._1
+ }
+ out.collect((id, sum, broadcast))
}
- coGrouped.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
- "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
+ }).withBroadcastSet(intDs, "ints")
+ coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "1,0,55\n" + "2,6,55\n" + "3,24,55\n" + "4,60,55\n" + "5,120,55\n"
+ }
- case 10 =>
- /*
- * CoGroup on two custom type inputs using expression keys
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt") {
- (first, second) =>
- val o = new CustomType(0, 0, "test")
- for (t <- first) {
- o.myInt = t.myInt
- o.myLong += t.myLong
- }
- for (t <- second) {
- o.myInt = t.myInt
- o.myLong += t.myLong
- }
- o
+ @Test
+ def testCoGroupOnTupleWithKeyFieldSelectorAndCustomTypeWithKeyExtractor: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get5TupleDataSet(env)
+ val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where(2).equalTo(_.myInt) apply {
+ (first, second) =>
+ var sum = 0L
+ var id = 0
+ for (t <- first) {
+ sum += t._1
+ id = t._3
}
- coGroupDs.writeAsText(resultPath)
- env.execute()
- "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" + "6," +
- "210,test\n"
-
- case 11 =>
- /*
- * CoGroup on two custom type inputs using expression keys
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getSmallPojoDataSet(env)
- val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
- (first, second, out: Collector[CustomType]) =>
- for (p <- first) {
- for (t <- second) {
- Assert.assertTrue(p.nestedPojo.longNumber == t._7)
- out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
- }
- }
+ for (t <- second) {
+ sum += t.myLong
+ id = t.myInt
}
- coGroupDs.writeAsText(resultPath)
- env.execute()
- "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
+ (id, sum, "test")
+ }
+ coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" +
+ "5," + "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
+ "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
+ }
- case 12 =>
- /*
- * CoGroup field-selector (expression keys) + key selector function
- * The key selector is unnecessary complicated (Tuple1) ;)
+ @Test
+ def testCoGroupOnCustomTypeWithKeyExtractorAndTupleInputKeyFieldSelector: Unit = {
+ /*
+ * CoGroup on a tuple input with key field selector and a custom type input with
+ * key extractor
*/
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getSmallPojoDataSet(env)
- val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
- (first, second, out: Collector[CustomType]) =>
- for (p <- first) {
- for (t <- second) {
- Assert.assertTrue(p.nestedPojo.longNumber == t._7)
- out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
- }
- }
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get5TupleDataSet(env)
+ val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+ val coGroupDs = ds2.coGroup(ds).where(_.myInt).equalTo(2) {
+ (first, second) =>
+ var sum = 0L
+ var id = 0
+ for (t <- first) {
+ sum += t.myLong
+ id = t.myInt
}
- coGroupDs.writeAsText(resultPath)
- env.execute()
- "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
-
- case 13 =>
- /*
- * CoGroup field-selector (expression keys) + key selector function
- * The key selector is simple here
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getSmallPojoDataSet(env)
- val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
- val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
- (first, second, out: Collector[CustomType]) =>
- for (p <- first) {
- for (t <- second) {
- Assert.assertTrue(p.nestedPojo.longNumber == t._7)
- out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
- }
- }
+ for (t <- second) {
+ sum += t._1
+ id = t._3
}
- coGroupDs.writeAsText(resultPath)
- env.execute()
- "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
- case _ =>
- throw new IllegalArgumentException("Invalid program id")
+ new CustomType(id, sum, "test")
}
+ coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "0,1,test\n" + "1,2,test\n" + "2,5,test\n" + "3,15,test\n" + "4,33,test\n" +
+ "5," + "63,test\n" + "6,109,test\n" + "7,4,test\n" + "8,4,test\n" + "9,4,test\n" + "10,5," +
+ "test\n" + "11,5,test\n" + "12,5,test\n" + "13,5,test\n" + "14,5,test\n"
}
-}
+ @Test
+ def testCoGroupWithMultipleKeyFields: Unit = {
+ /*
+ * CoGroup with multiple key fields
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets.get5TupleDataSet(env)
+ val ds2 = CollectionDataSets.get3TupleDataSet(env)
+ val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
+ (first, second, out: Collector[(Int, Long, String)]) =>
+ val strs = first map(_._4)
+ for (t <- second) {
+ for (s <- strs) {
+ out.collect((t._1, t._2, s))
+ }
+ }
+ }
-@RunWith(classOf[Parameterized])
-class CoGroupITCase(config: Configuration) extends JavaProgramTestBase(config) {
+ coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
+ "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
+ }
- private val curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
+ @Test
+ def testCoGroupWithMultipleKeyExtractors: Unit = {
+ /*
+ * CoGroup with multiple key extractors
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets
+ .get5TupleDataSet(env)
+ val ds2 = CollectionDataSets.get3TupleDataSet(env)
+ val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2))
+ .apply {
+ (first, second, out: Collector[(Int, Long, String)]) =>
+ val strs = first map(_._4)
+ for (t <- second) {
+ for (s <- strs) {
+ out.collect((t._1, t._2, s))
+ }
+ }
+ }
+
+ coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "1,1,Hallo\n" + "2,2,Hallo Welt\n" + "3,2,Hallo Welt wie gehts?\n" + "3,2," +
+ "ABC\n" + "5,3,HIJ\n" + "5,3,IJK\n"
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
}
- protected def testProgram(): Unit = {
- expectedResult = CoGroupProgs.runProgram(curProgId, resultPath)
+ @Test
+ def testCoGroupOnTwoCustomTypesUsingExpressionKeys: Unit = {
+ /*
+ * CoGroup on two custom type inputs using expression keys
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getCustomTypeDataSet(env)
+ val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt") {
+ (first, second) =>
+ val o = new CustomType(0, 0, "test")
+ for (t <- first) {
+ o.myInt = t.myInt
+ o.myLong += t.myLong
+ }
+ for (t <- second) {
+ o.myInt = t.myInt
+ o.myLong += t.myLong
+ }
+ o
+ }
+ coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "1,0,test\n" + "2,6,test\n" + "3,24,test\n" + "4,60,test\n" + "5,120,test\n" +
+ "6," + "210,test\n"
}
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
+ @Test
+ def testCoGroupOnTwoCustomTypesUsingExpressionKeysAndFieldSelector: Unit = {
+ /*
+ * CoGroup on two custom type inputs using expression keys
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getSmallPojoDataSet(env)
+ val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
+ (first, second, out: Collector[CustomType]) =>
+ for (p <- first) {
+ for (t <- second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+ }
+ }
+ }
+ coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
}
-}
-object CoGroupITCase {
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to CoGroupProgs.NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
+ @Test
+ def testCoGroupFieldSelectorAndKeySelector: Unit = {
+ /*
+ * CoGroup field-selector (expression keys) + key selector function
+ * The key selector is unnecessary complicated (Tuple1) ;)
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getSmallPojoDataSet(env)
+ val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
+ (first, second, out: Collector[CustomType]) =>
+ for (p <- first) {
+ for (t <- second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+ }
+ }
}
+ coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
+ }
- configs.asJavaCollection
+ @Test
+ def testCoGroupKeySelectorAndFieldSelector: Unit = {
+ /*
+ * CoGroup field-selector (expression keys) + key selector function
+ * The key selector is simple here
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getSmallPojoDataSet(env)
+ val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
+ val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
+ (first, second, out: Collector[CustomType]) =>
+ for (p <- first) {
+ for (t <- second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+ }
+ }
+ }
+ coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expectedResult = "-1,20000,Flink\n" + "-1,10000,Flink\n" + "-1,30000,Flink\n"
}
}