You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:10 UTC
[74/82] [abbrv] incubator-flink git commit: Change integration tests
to reuse cluster in order to save startup and shutdown time.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
index d645bc6..9755caa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
@@ -18,10 +18,6 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -30,28 +26,18 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.Assert;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
/**
* Tests for the DataSource
*/
-@RunWith(Parameterized.class)
public class DataSourceITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 1;
-
- private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
private String inputPath;
private String expectedResult;
- public DataSourceITCase(Configuration config) {
- super(config);
- }
-
+
@Override
protected void preSubmit() throws Exception {
inputPath = createTempFile("input", "ab\n"
@@ -62,70 +48,42 @@ public class DataSourceITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- expectedResult = DataSourceProgs.runProgram(curProgId, inputPath, resultPath);
+ /*
+ * Test passing a configuration object to an input format
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Configuration ifConf = new Configuration();
+ ifConf.setString("prepend", "test");
+
+ DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf);
+ ds.writeAsText(resultPath);
+ env.execute();
+
+ expectedResult= "ab\n"
+ + "cd\n"
+ + "ef\n";
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(expectedResult, resultPath);
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
- }
-
private static class TestInputFormat extends TextInputFormat {
private static final long serialVersionUID = 1L;
public TestInputFormat(Path filePath) {
super(filePath);
}
-
+
@Override
public void configure(Configuration parameters) {
super.configure(parameters);
-
+
Assert.assertNotNull(parameters.getString("prepend", null));
Assert.assertEquals("test", parameters.getString("prepend", null));
}
-
- }
- private static class DataSourceProgs {
-
- public static String runProgram(int progId, String inputPath, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Test passing a configuration object to an input format
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Configuration ifConf = new Configuration();
- ifConf.setString("prepend", "test");
-
- DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf);
- ds.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "ab\n"
- + "cd\n"
- + "ef\n";
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
- }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index fb62459..84964a2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -31,280 +28,266 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
-public class DistinctITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 8;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+ public DistinctITCase(ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public DistinctITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = DistinctProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector() throws Exception {
+ /*
+ * check correctness of distinct on tuples with key field selector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2);
+
+ distinctDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ @Test
+ public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected()
+ throws Exception{
+ /*
+ * check correctness of distinct on tuples with key field selector with not all fields selected
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0);
+
+ distinctDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1\n" +
+ "2\n";
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
+
+ @Test
+ public void testCorrectnessOfDistinctOnTuplesWithKeyExtractorFunction() throws Exception {
+ /*
+ * check correctness of distinct on tuples with key extractor function
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
+ .distinct(new KeySelector1()).project(0);
+
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1\n" +
+ "2\n";
+ }
+
+ public static class KeySelector1 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(Tuple5<Integer, Long, Integer, String, Long> in) {
+ return in.f0;
}
-
- return toParameterList(tConfigs);
}
-
- private static class DistinctProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
-
- /*
- * check correctness of distinct on tuples with key field selector
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2);
-
- distinctDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n";
- }
- case 2: {
-
- /*
- * check correctness of distinct on tuples with key field selector with not all fields selected
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0);
-
- distinctDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1\n" +
- "2\n";
- }
- case 3: {
-
- /*
- * check correctness of distinct on tuples with key extractor function
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
- .distinct(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(Tuple5<Integer, Long, Integer, String, Long> in) {
- return in.f0;
- }
- }).project(0);
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1\n" +
- "2\n";
-
- }
- case 4: {
-
- /*
- * check correctness of distinct on custom type with type extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<Tuple1<Integer>> reduceDs = ds
- .distinct(new KeySelector<CustomType, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- })
- .map(new RichMapFunction<CustomType, Tuple1<Integer>>() {
- @Override
- public Tuple1<Integer> map(CustomType value) throws Exception {
- return new Tuple1<Integer>(value.myInt);
- }
- });
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1\n" +
- "2\n" +
- "3\n" +
- "4\n" +
- "5\n" +
- "6\n";
-
- }
- case 5: {
-
- /*
- * check correctness of distinct on tuples
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct();
-
- distinctDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n";
- }
- case 6: {
-
- /*
- * check correctness of distinct on custom type with tuple-returning type extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple2<Integer, Long>> reduceDs = ds
- .distinct(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Tuple2<Integer,Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
- return new Tuple2<Integer, Long>(t.f0, t.f4);
- }
- })
+
+ @Test
+ public void testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor() throws Exception {
+ /*
+ * check correctness of distinct on custom type with type extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<Tuple1<Integer>> reduceDs = ds
+ .distinct(new KeySelector3())
+ .map(new Mapper3());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1\n" +
+ "2\n" +
+ "3\n" +
+ "4\n" +
+ "5\n" +
+ "6\n";
+ }
+
+ public static class Mapper3 extends RichMapFunction<CustomType, Tuple1<Integer>> {
+ @Override
+ public Tuple1<Integer> map(CustomType value) throws Exception {
+ return new Tuple1<Integer>(value.myInt);
+ }
+ }
+
+ public static class KeySelector3 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }
+
+ @Test
+ public void testCorrectnessOfDistinctOnTuples() throws Exception{
+ /*
+ * check correctness of distinct on tuples
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct();
+
+ distinctDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n";
+ }
+
+ @Test
+ public void testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws
+ Exception{
+ /*
+ * check correctness of distinct on custom type with tuple-returning type extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> reduceDs = ds
+ .distinct(new KeySelector2())
.project(0,4);
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1\n" +
- "2,1\n" +
- "2,2\n" +
- "3,2\n" +
- "3,3\n" +
- "4,1\n" +
- "4,2\n" +
- "5,1\n" +
- "5,2\n" +
- "5,3\n";
- }
- case 7: {
-
- /*
- * check correctness of distinct on tuples with field expressions
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1\n" +
+ "2,1\n" +
+ "2,2\n" +
+ "3,2\n" +
+ "3,3\n" +
+ "4,1\n" +
+ "4,2\n" +
+ "5,1\n" +
+ "5,2\n" +
+ "5,3\n";
+ }
+
+ public static class KeySelector2 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Tuple2<Integer,Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }
+
+ @Test
+ public void testCorrectnessOfDistinctOnTuplesWithFieldExpressions() throws Exception {
+ /*
+ * check correctness of distinct on tuples with field expressions
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
.distinct("f0").project(0);
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1\n" +
- "2\n";
-
- }
- case 8: {
-
- /*
- * check correctness of distinct on Pojos
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
- DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new MapFunction<CollectionDataSets.POJO, Integer>() {
- @Override
- public Integer map(POJO value) throws Exception {
- return (int) value.nestedPojo.longNumber;
- }
- });
-
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "10000\n20000\n30000\n";
- }
- case 9: {
-
- /*
- * distinct on full Pojo
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
- DataSet<Integer> reduceDs = ds.distinct().map(new MapFunction<CollectionDataSets.POJO, Integer>() {
- @Override
- public Integer map(POJO value) throws Exception {
- return (int) value.nestedPojo.longNumber;
- }
- });
-
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "10000\n20000\n30000\n";
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1\n" +
+ "2\n";
+ }
+
+ @Test
+ public void testCorrectnessOfDistinctOnPojos() throws Exception {
+ /*
+ * check correctness of distinct on Pojos
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
+ DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new Mapper2());
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "10000\n20000\n30000\n";
+ }
+
+ public static class Mapper2 implements MapFunction<CollectionDataSets.POJO, Integer> {
+ @Override
+ public Integer map(POJO value) throws Exception {
+ return (int) value.nestedPojo.longNumber;
+ }
+ }
+
+ @Test
+ public void testDistinctOnFullPojo() throws Exception {
+ /*
+ * distinct on full Pojo
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
+ DataSet<Integer> reduceDs = ds.distinct().map(new Mapper1());
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "10000\n20000\n30000\n";
+ }
+
+ public static class Mapper1 implements MapFunction<CollectionDataSets.POJO, Integer> {
+ @Override
+ public Integer map(POJO value) throws Exception {
+ return (int) value.nestedPojo.longNumber;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
index fbd968d..3bf83b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
@@ -18,10 +18,7 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.Collection;
-import java.util.LinkedList;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -29,315 +26,311 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
-public class FilterITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 8;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class FilterITCase extends MultipleProgramsTestBase {
+ public FilterITCase(ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public FilterITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = FilterProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testAllRejectingFilter() throws Exception {
+ /*
+ * Test all-rejecting filter.
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(new Filter1());
+
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ public static class Filter1 implements FilterFunction<Tuple3<Integer,Long,String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+ return false;
+ }
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ @Test
+ public void testAllPassingFilter() throws Exception {
+ /*
+ * Test all-passing filter.
+ */
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(new Filter2());
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" +
+ "5,3,I am fine.\n" +
+ "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" +
+ "8,4,Comment#2\n" +
+ "9,4,Comment#3\n" +
+ "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" +
+ "12,5,Comment#6\n" +
+ "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" +
+ "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" +
+ "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" +
+ "21,6,Comment#15\n";
+ }
+
+ public static class Filter2 implements FilterFunction<Tuple3<Integer,Long,String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+ return true;
}
-
- return toParameterList(tConfigs);
}
-
- private static class FilterProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Test all-rejecting filter.
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
- return false;
- }
- });
-
- filterDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "\n";
- }
- case 2: {
- /*
- * Test all-passing filter.
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
- return true;
- }
- });
- filterDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" +
- "5,3,I am fine.\n" +
- "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" +
- "8,4,Comment#2\n" +
- "9,4,Comment#3\n" +
- "10,4,Comment#4\n" +
- "11,5,Comment#5\n" +
- "12,5,Comment#6\n" +
- "13,5,Comment#7\n" +
- "14,5,Comment#8\n" +
- "15,5,Comment#9\n" +
- "16,6,Comment#10\n" +
- "17,6,Comment#11\n" +
- "18,6,Comment#12\n" +
- "19,6,Comment#13\n" +
- "20,6,Comment#14\n" +
- "21,6,Comment#15\n";
- }
- case 3: {
- /*
- * Test filter on String tuple field.
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
- return value.f2.contains("world");
- }
- });
- filterDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n";
-
- }
- case 4: {
- /*
+
+ @Test
+ public void testFilterOnStringTupleField() throws Exception {
+ /*
+ * Test filter on String tuple field.
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(new Filter3());
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n";
+
+ }
+
+ public static class Filter3 implements FilterFunction<Tuple3<Integer,Long,String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+ return value.f2.contains("world");
+ }
+ }
+
+ @Test
+ public void testFilterOnIntegerTupleField() throws Exception {
+ /*
* Test filter on Integer tuple field.
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
- return (value.f0 % 2) == 0;
- }
- });
- filterDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "2,2,Hello\n" +
- "4,3,Hello world, how are you?\n" +
- "6,3,Luke Skywalker\n" +
- "8,4,Comment#2\n" +
- "10,4,Comment#4\n" +
- "12,5,Comment#6\n" +
- "14,5,Comment#8\n" +
- "16,6,Comment#10\n" +
- "18,6,Comment#12\n" +
- "20,6,Comment#14\n";
- }
- case 5: {
- /*
- * Test filter on basic type
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
- DataSet<String> filterDs = ds.
- filter(new FilterFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(String value) throws Exception {
- return value.startsWith("H");
- }
- });
- filterDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "Hi\n" +
- "Hello\n" +
- "Hello world\n" +
- "Hello world, how are you?\n";
- }
- case 6: {
- /*
- * Test filter on custom type
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> filterDs = ds.
- filter(new FilterFunction<CustomType>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(CustomType value) throws Exception {
- return value.myString.contains("a");
- }
- });
- filterDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "3,3,Hello world, how are you?\n" +
- "3,4,I am fine.\n" +
- "3,5,Luke Skywalker\n";
- }
- case 7: {
- /*
- * Test filter on String tuple field.
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
-
- int literal = -1;
-
- @Override
- public void open(Configuration config) {
- Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
- for(int i: ints) {
- literal = literal < i ? i : literal;
- }
- }
-
- @Override
- public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
- return value.f0 < literal;
- }
- }).withBroadcastSet(ints, "ints");
- filterDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n";
- }
- case 8: {
- /*
- * Test filter with broadcast variables
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
- private int broadcastSum = 0;
-
- @Override
- public void open(Configuration config) {
- Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
- for(Integer i : ints) {
- broadcastSum += i;
- }
- }
-
- @Override
- public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
- return (value.f1 == (broadcastSum / 11));
- }
- }).withBroadcastSet(intDs, "ints");
- filterDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "11,5,Comment#5\n" +
- "12,5,Comment#6\n" +
- "13,5,Comment#7\n" +
- "14,5,Comment#8\n" +
- "15,5,Comment#9\n";
-
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(new Filter4());
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "2,2,Hello\n" +
+ "4,3,Hello world, how are you?\n" +
+ "6,3,Luke Skywalker\n" +
+ "8,4,Comment#2\n" +
+ "10,4,Comment#4\n" +
+ "12,5,Comment#6\n" +
+ "14,5,Comment#8\n" +
+ "16,6,Comment#10\n" +
+ "18,6,Comment#12\n" +
+ "20,6,Comment#14\n";
+ }
+
+ public static class Filter4 implements FilterFunction<Tuple3<Integer,Long,String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+ return (value.f0 % 2) == 0;
+ }
+ }
+
+ @Test
+ public void testFilterBasicType() throws Exception {
+ /*
+ * Test filter on basic type
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+ DataSet<String> filterDs = ds.
+ filter(new Filter5());
+ filterDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "Hi\n" +
+ "Hello\n" +
+ "Hello world\n" +
+ "Hello world, how are you?\n";
+ }
+
+ public static class Filter5 implements FilterFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(String value) throws Exception {
+ return value.startsWith("H");
+ }
+ }
+
+ @Test
+ public void testFilterOnCustomType() throws Exception {
+ /*
+ * Test filter on custom type
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> filterDs = ds.
+ filter(new Filter6());
+ filterDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "3,3,Hello world, how are you?\n" +
+ "3,4,I am fine.\n" +
+ "3,5,Luke Skywalker\n";
+ }
+
+ public static class Filter6 implements FilterFunction<CustomType> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(CustomType value) throws Exception {
+ return value.myString.contains("a");
+ }
+ }
+
+ @Test
+ public void testRichFilterOnStringTupleField() throws Exception {
+ /*
+ * Test filter on String tuple field.
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(new RichFilter1()).withBroadcastSet(ints, "ints");
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n";
+ }
+
+ public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
+ private static final long serialVersionUID = 1L;
+
+ int literal = -1;
+
+ @Override
+ public void open(Configuration config) {
+ Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+ for(int i: ints) {
+ literal = literal < i ? i : literal;
}
- default:
- throw new IllegalArgumentException("Invalid program id");
+ }
+
+ @Override
+ public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+ return value.f0 < literal;
+ }
+ }
+
+ @Test
+ public void testFilterWithBroadcastVariables() throws Exception {
+ /*
+ * Test filter with broadcast variables
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(new RichFilter2()).withBroadcastSet(intDs, "ints");
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "11,5,Comment#5\n" +
+ "12,5,Comment#6\n" +
+ "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" +
+ "15,5,Comment#9\n";
+ }
+
+ public static class RichFilter2 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
+ private static final long serialVersionUID = 1L;
+ private int broadcastSum = 0;
+
+ @Override
+ public void open(Configuration config) {
+ Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+ for(Integer i : ints) {
+ broadcastSum += i;
}
}
+
+ @Override
+ public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
+ return (value.f1 == (broadcastSum / 11));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
index 770cf88..24bc3e6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -18,11 +18,6 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
@@ -30,122 +25,94 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
-public class FirstNITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 3;
-
- private int curProgId = config.getInteger("ProgramId", -1);
- private String resultPath;
- private String expectedResult;
-
- public FirstNITCase(Configuration config) {
- super(config);
+public class FirstNITCase extends MultipleProgramsTestBase {
+ public FirstNITCase(ExecutionMode mode){
+ super(mode);
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ private String resultPath;
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = FirstNProgs.runProgram(curProgId, resultPath);
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ @Test
+ public void testFirstNOnUngroupedDS() throws Exception {
+ /*
+ * First-n on ungrouped data set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
+
+ seven.writeAsText(resultPath);
+ env.execute();
+
+ expected = "(7)\n";
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ @Test
+ public void testFirstNOnGroupedDS() throws Exception {
+ /*
+ * First-n on grouped data set
+ */
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
+ .map(new OneMapper2()).groupBy(0).sum(1);
+
+ first.writeAsText(resultPath);
+ env.execute();
+
+ expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
}
-
- private static class FirstNProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * First-n on ungrouped data set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
-
- seven.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "(7)\n";
- }
- case 2: {
- /*
- * First-n on grouped data set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
- .map(new OneMapper2()).groupBy(0).sum(1);
-
- first.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
- }
- case 3: {
- /*
- * First-n on grouped and sorted data set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
+
+ @Test
+ public void testFirstNOnGroupedAndSortedDS() throws Exception {
+ /*
+ * First-n on grouped and sorted data set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
.project(1,0);
-
- first.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "(1,1)\n"
- + "(2,3)\n(2,2)\n"
- + "(3,6)\n(3,5)\n(3,4)\n"
- + "(4,10)\n(4,9)\n(4,8)\n"
- + "(5,15)\n(5,14)\n(5,13)\n"
- + "(6,21)\n(6,20)\n(6,19)\n";
-
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
-
- }
-
+
+ first.writeAsText(resultPath);
+ env.execute();
+
+ expected = "(1,1)\n"
+ + "(2,3)\n(2,2)\n"
+ + "(3,6)\n(3,5)\n(3,4)\n"
+ + "(4,10)\n(4,9)\n(4,8)\n"
+ + "(5,15)\n(5,14)\n(5,13)\n"
+ + "(6,21)\n(6,20)\n(6,19)\n";
}
public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
index 906ba05..bf49eae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
@@ -18,10 +18,7 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.Collection;
-import java.util.LinkedList;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -29,356 +26,349 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
-public class FlatMapITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 7;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class FlatMapITCase extends MultipleProgramsTestBase {
+ public FlatMapITCase(ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public FlatMapITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = FlatMapProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testNonPassingFlatMap() throws Exception {
+ /*
+ * Test non-passing flatmap
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+ DataSet<String> nonPassingFlatMapDs = ds.
+ flatMap(new FlatMapper1());
+
+ nonPassingFlatMapDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ public static class FlatMapper1 implements FlatMapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String value, Collector<String> out) throws Exception {
+ if ( value.contains("bananas") ) {
+ out.collect(value);
+ }
+ }
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ @Test
+ public void testDataDuplicatingFlatMap() throws Exception {
+ /*
+ * Test data duplicating flatmap
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+ DataSet<String> duplicatingFlatMapDs = ds.
+ flatMap(new FlatMapper2());
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
+ duplicatingFlatMapDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "Hi\n" + "HI\n" +
+ "Hello\n" + "HELLO\n" +
+ "Hello world\n" + "HELLO WORLD\n" +
+ "Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" +
+ "I am fine.\n" + "I AM FINE.\n" +
+ "Luke Skywalker\n" + "LUKE SKYWALKER\n" +
+ "Random comment\n" + "RANDOM COMMENT\n" +
+ "LOL\n" + "LOL\n";
+ }
+
+ public static class FlatMapper2 implements FlatMapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String value, Collector<String> out) throws Exception {
+ out.collect(value);
+ out.collect(value.toUpperCase());
}
-
- return toParameterList(tConfigs);
}
-
- private static class FlatMapProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Test non-passing flatmap
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
- DataSet<String> nonPassingFlatMapDs = ds.
- flatMap(new FlatMapFunction<String, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String value, Collector<String> out) throws Exception {
- if ( value.contains("bananas") ) {
- out.collect(value);
- }
- }
- });
-
- nonPassingFlatMapDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "\n";
- }
- case 2: {
- /*
- * Test data duplicating flatmap
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
- DataSet<String> duplicatingFlatMapDs = ds.
- flatMap(new FlatMapFunction<String, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String value, Collector<String> out) throws Exception {
- out.collect(value);
- out.collect(value.toUpperCase());
- }
- });
-
- duplicatingFlatMapDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "Hi\n" + "HI\n" +
- "Hello\n" + "HELLO\n" +
- "Hello world\n" + "HELLO WORLD\n" +
- "Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" +
- "I am fine.\n" + "I AM FINE.\n" +
- "Luke Skywalker\n" + "LUKE SKYWALKER\n" +
- "Random comment\n" + "RANDOM COMMENT\n" +
- "LOL\n" + "LOL\n";
- }
- case 3: {
- /*
- * Test flatmap with varying number of emitted tuples
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds.
- flatMap(new FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Tuple3<Integer, Long, String> value,
- Collector<Tuple3<Integer, Long, String>> out) throws Exception {
- final int numTuples = value.f0 % 3;
- for ( int i = 0; i < numTuples; i++ ) {
- out.collect(value);
- }
- }
- });
-
- varyingTuplesMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "2,2,Hello\n" + "2,2,Hello\n" +
- "4,3,Hello world, how are you?\n" +
- "5,3,I am fine.\n" + "5,3,I am fine.\n" +
- "7,4,Comment#1\n" +
- "8,4,Comment#2\n" + "8,4,Comment#2\n" +
- "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "11,5,Comment#5\n" +
- "13,5,Comment#7\n" +
- "14,5,Comment#8\n" + "14,5,Comment#8\n" +
- "16,6,Comment#10\n" +
- "17,6,Comment#11\n" + "17,6,Comment#11\n" +
- "19,6,Comment#13\n" +
- "20,6,Comment#14\n" + "20,6,Comment#14\n";
- }
- case 4: {
- /*
- * Test type conversion flatmapper (Custom -> Tuple)
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs = ds.
- flatMap(new FlatMapFunction<CustomType, Tuple3<Integer, Long, String>>() {
- private static final long serialVersionUID = 1L;
- private final Tuple3<Integer, Long, String> outTuple =
- new Tuple3<Integer, Long, String>();
-
- @Override
- public void flatMap(CustomType value, Collector<Tuple3<Integer, Long, String>> out)
- throws Exception {
- outTuple.setField(value.myInt, 0);
- outTuple.setField(value.myLong, 1);
- outTuple.setField(value.myString, 2);
- out.collect(outTuple);
- }
- });
-
- typeConversionFlatMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,Hi\n" +
- "2,1,Hello\n" +
- "2,2,Hello world\n" +
- "3,3,Hello world, how are you?\n" +
- "3,4,I am fine.\n" +
- "3,5,Luke Skywalker\n" +
- "4,6,Comment#1\n" +
- "4,7,Comment#2\n" +
- "4,8,Comment#3\n" +
- "4,9,Comment#4\n" +
- "5,10,Comment#5\n" +
- "5,11,Comment#6\n" +
- "5,12,Comment#7\n" +
- "5,13,Comment#8\n" +
- "5,14,Comment#9\n" +
- "6,15,Comment#10\n" +
- "6,16,Comment#11\n" +
- "6,17,Comment#12\n" +
- "6,18,Comment#13\n" +
- "6,19,Comment#14\n" +
- "6,20,Comment#15\n";
- }
- case 5: {
- /*
- * Test type conversion flatmapper (Tuple -> Basic)
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<String> typeConversionFlatMapDs = ds.
- flatMap(new FlatMapFunction<Tuple3<Integer, Long, String>, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Tuple3<Integer, Long, String> value,
- Collector<String> out) throws Exception {
- out.collect(value.f2);
- }
- });
-
- typeConversionFlatMapDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "Hi\n" + "Hello\n" + "Hello world\n" +
- "Hello world, how are you?\n" +
- "I am fine.\n" + "Luke Skywalker\n" +
- "Comment#1\n" + "Comment#2\n" +
- "Comment#3\n" + "Comment#4\n" +
- "Comment#5\n" + "Comment#6\n" +
- "Comment#7\n" + "Comment#8\n" +
- "Comment#9\n" + "Comment#10\n" +
- "Comment#11\n" + "Comment#12\n" +
- "Comment#13\n" + "Comment#14\n" +
- "Comment#15\n";
- }
- case 6: {
- /*
- * Test flatmapper if UDF returns input object
- * multiple times and changes it in between
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds.
- flatMap(new FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap( Tuple3<Integer, Long, String> value,
- Collector<Tuple3<Integer, Long, String>> out) throws Exception {
- final int numTuples = value.f0 % 4;
- for ( int i = 0; i < numTuples; i++ ) {
- value.setField(i, 0);
- out.collect(value);
- }
- }
- });
-
- inputObjFlatMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "0,1,Hi\n" +
- "0,2,Hello\n" + "1,2,Hello\n" +
- "0,2,Hello world\n" + "1,2,Hello world\n" + "2,2,Hello world\n" +
- "0,3,I am fine.\n" +
- "0,3,Luke Skywalker\n" + "1,3,Luke Skywalker\n" +
- "0,4,Comment#1\n" + "1,4,Comment#1\n" + "2,4,Comment#1\n" +
- "0,4,Comment#3\n" +
- "0,4,Comment#4\n" + "1,4,Comment#4\n" +
- "0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" +
- "0,5,Comment#7\n" +
- "0,5,Comment#8\n" + "1,5,Comment#8\n" +
- "0,5,Comment#9\n" + "1,5,Comment#9\n" + "2,5,Comment#9\n" +
- "0,6,Comment#11\n" +
- "0,6,Comment#12\n" + "1,6,Comment#12\n" +
- "0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" +
- "0,6,Comment#15\n";
+
+ @Test
+ public void testFlatMapWithVaryingNumberOfEmittedTuples() throws Exception {
+ /*
+ * Test flatmap with varying number of emitted tuples
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds.
+ flatMap(new FlatMapper3());
+
+ varyingTuplesMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "2,2,Hello\n" + "2,2,Hello\n" +
+ "4,3,Hello world, how are you?\n" +
+ "5,3,I am fine.\n" + "5,3,I am fine.\n" +
+ "7,4,Comment#1\n" +
+ "8,4,Comment#2\n" + "8,4,Comment#2\n" +
+ "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "11,5,Comment#5\n" +
+ "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "14,5,Comment#8\n" +
+ "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "17,6,Comment#11\n" +
+ "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "20,6,Comment#14\n";
+ }
+
+ public static class FlatMapper3 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Tuple3<Integer, Long, String> value,
+ Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ final int numTuples = value.f0 % 3;
+ for ( int i = 0; i < numTuples; i++ ) {
+ out.collect(value);
}
- case 7: {
- /*
- * Test flatmap with broadcast set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
- flatMap(new RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
- private final Tuple3<Integer, Long, String> outTuple =
- new Tuple3<Integer, Long, String>();
- private Integer f2Replace = 0;
-
- @Override
- public void open(Configuration config) {
- Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
- int sum = 0;
- for(Integer i : ints) {
- sum += i;
- }
- f2Replace = sum;
- }
-
- @Override
- public void flatMap(Tuple3<Integer, Long, String> value,
- Collector<Tuple3<Integer, Long, String>> out) throws Exception {
- outTuple.setFields(f2Replace, value.f1, value.f2);
- out.collect(outTuple);
- }
- }).withBroadcastSet(ints, "ints");
- bcFlatMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "55,1,Hi\n" +
- "55,2,Hello\n" +
- "55,2,Hello world\n" +
- "55,3,Hello world, how are you?\n" +
- "55,3,I am fine.\n" +
- "55,3,Luke Skywalker\n" +
- "55,4,Comment#1\n" +
- "55,4,Comment#2\n" +
- "55,4,Comment#3\n" +
- "55,4,Comment#4\n" +
- "55,5,Comment#5\n" +
- "55,5,Comment#6\n" +
- "55,5,Comment#7\n" +
- "55,5,Comment#8\n" +
- "55,5,Comment#9\n" +
- "55,6,Comment#10\n" +
- "55,6,Comment#11\n" +
- "55,6,Comment#12\n" +
- "55,6,Comment#13\n" +
- "55,6,Comment#14\n" +
- "55,6,Comment#15\n";
+ }
+ }
+
+ @Test
+ public void testTypeConversionFlatMapperCustomToTuple() throws Exception {
+ /*
+ * Test type conversion flatmapper (Custom -> Tuple)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs = ds.
+ flatMap(new FlatMapper4());
+
+ typeConversionFlatMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,0,Hi\n" +
+ "2,1,Hello\n" +
+ "2,2,Hello world\n" +
+ "3,3,Hello world, how are you?\n" +
+ "3,4,I am fine.\n" +
+ "3,5,Luke Skywalker\n" +
+ "4,6,Comment#1\n" +
+ "4,7,Comment#2\n" +
+ "4,8,Comment#3\n" +
+ "4,9,Comment#4\n" +
+ "5,10,Comment#5\n" +
+ "5,11,Comment#6\n" +
+ "5,12,Comment#7\n" +
+ "5,13,Comment#8\n" +
+ "5,14,Comment#9\n" +
+ "6,15,Comment#10\n" +
+ "6,16,Comment#11\n" +
+ "6,17,Comment#12\n" +
+ "6,18,Comment#13\n" +
+ "6,19,Comment#14\n" +
+ "6,20,Comment#15\n";
+ }
+
+ public static class FlatMapper4 implements FlatMapFunction<CustomType, Tuple3<Integer, Long, String>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple3<Integer, Long, String> outTuple =
+ new Tuple3<Integer, Long, String>();
+
+ @Override
+ public void flatMap(CustomType value, Collector<Tuple3<Integer, Long, String>> out)
+ throws Exception {
+ outTuple.setField(value.myInt, 0);
+ outTuple.setField(value.myLong, 1);
+ outTuple.setField(value.myString, 2);
+ out.collect(outTuple);
+ }
+ }
+
+ @Test
+ public void testTypeConversionFlatMapperTupleToBasic() throws Exception {
+ /*
+ * Test type conversion flatmapper (Tuple -> Basic)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<String> typeConversionFlatMapDs = ds.
+ flatMap(new FlatMapper5());
+
+ typeConversionFlatMapDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "Hi\n" + "Hello\n" + "Hello world\n" +
+ "Hello world, how are you?\n" +
+ "I am fine.\n" + "Luke Skywalker\n" +
+ "Comment#1\n" + "Comment#2\n" +
+ "Comment#3\n" + "Comment#4\n" +
+ "Comment#5\n" + "Comment#6\n" +
+ "Comment#7\n" + "Comment#8\n" +
+ "Comment#9\n" + "Comment#10\n" +
+ "Comment#11\n" + "Comment#12\n" +
+ "Comment#13\n" + "Comment#14\n" +
+ "Comment#15\n";
+ }
+
+ public static class FlatMapper5 implements FlatMapFunction<Tuple3<Integer, Long, String>,String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Tuple3<Integer, Long, String> value,
+ Collector<String> out) throws Exception {
+ out.collect(value.f2);
+ }
+ }
+
+ @Test
+ public void testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws
+ Exception {
+ /*
+ * Test flatmapper if UDF returns input object
+ * multiple times and changes it in between
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds.
+ flatMap(new FlatMapper6());
+
+ inputObjFlatMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "0,1,Hi\n" +
+ "0,2,Hello\n" + "1,2,Hello\n" +
+ "0,2,Hello world\n" + "1,2,Hello world\n" + "2,2,Hello world\n" +
+ "0,3,I am fine.\n" +
+ "0,3,Luke Skywalker\n" + "1,3,Luke Skywalker\n" +
+ "0,4,Comment#1\n" + "1,4,Comment#1\n" + "2,4,Comment#1\n" +
+ "0,4,Comment#3\n" +
+ "0,4,Comment#4\n" + "1,4,Comment#4\n" +
+ "0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" +
+ "0,5,Comment#7\n" +
+ "0,5,Comment#8\n" + "1,5,Comment#8\n" +
+ "0,5,Comment#9\n" + "1,5,Comment#9\n" + "2,5,Comment#9\n" +
+ "0,6,Comment#11\n" +
+ "0,6,Comment#12\n" + "1,6,Comment#12\n" +
+ "0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" +
+ "0,6,Comment#15\n";
+ }
+
+ public static class FlatMapper6 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap( Tuple3<Integer, Long, String> value,
+ Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ final int numTuples = value.f0 % 4;
+ for ( int i = 0; i < numTuples; i++ ) {
+ value.setField(i, 0);
+ out.collect(value);
}
- default:
- throw new IllegalArgumentException("Invalid program id");
+ }
+ }
+
+ @Test
+ public void testFlatMapWithBroadcastSet() throws Exception {
+ /*
+ * Test flatmap with broadcast set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
+ flatMap(new RichFlatMapper1()).withBroadcastSet(ints, "ints");
+ bcFlatMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "55,1,Hi\n" +
+ "55,2,Hello\n" +
+ "55,2,Hello world\n" +
+ "55,3,Hello world, how are you?\n" +
+ "55,3,I am fine.\n" +
+ "55,3,Luke Skywalker\n" +
+ "55,4,Comment#1\n" +
+ "55,4,Comment#2\n" +
+ "55,4,Comment#3\n" +
+ "55,4,Comment#4\n" +
+ "55,5,Comment#5\n" +
+ "55,5,Comment#6\n" +
+ "55,5,Comment#7\n" +
+ "55,5,Comment#8\n" +
+ "55,5,Comment#9\n" +
+ "55,6,Comment#10\n" +
+ "55,6,Comment#11\n" +
+ "55,6,Comment#12\n" +
+ "55,6,Comment#13\n" +
+ "55,6,Comment#14\n" +
+ "55,6,Comment#15\n";
+ }
+
+ public static class RichFlatMapper1 extends RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple3<Integer, Long, String> outTuple =
+ new Tuple3<Integer, Long, String>();
+ private Integer f2Replace = 0;
+
+ @Override
+ public void open(Configuration config) {
+ Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+ int sum = 0;
+ for(Integer i : ints) {
+ sum += i;
}
-
+ f2Replace = sum;
+ }
+
+ @Override
+ public void flatMap(Tuple3<Integer, Long, String> value,
+ Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ outTuple.setFields(f2Replace, value.f1, value.f2);
+ out.collect(outTuple);
}
-
}
}