You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:11 UTC
[75/82] [abbrv] incubator-flink git commit: Change integration tests
to reuse cluster in order to save startup and shutdown time.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
index bc026c9..6fe549f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -18,82 +18,68 @@
package org.apache.flink.test.exampleScalaPrograms;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
+import java.io.File;
-import org.apache.flink.configuration.Configuration;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
import org.apache.flink.examples.scala.graph.PageRankBasic;
import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
-public class PageRankITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 2;
-
- private int curProgId = config.getInteger("ProgramId", -1);
-
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+ public PageRankITCase(ExecutionMode mode){
+ super(mode);
+ }
+
private String verticesPath;
private String edgesPath;
private String resultPath;
- private String expectedResult;
-
- public PageRankITCase(Configuration config) {
- super(config);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
- edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
- }
+ private String expected;
- @Override
- protected void testProgram() throws Exception {
- expectedResult = runProgram(curProgId);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ File resultFile = tempFolder.newFile();
+ //Delete file because the Scala API does not respect WriteMode set by the configuration
+ resultFile.delete();
+ resultPath = resultFile.toURI().toString();
+
+ File verticesFile = tempFolder.newFile();
+ Files.write(PageRankData.VERTICES, verticesFile, Charsets.UTF_8);
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8);
+
+ verticesPath = verticesFile.toURI().toString();
+ edgesPath = edgesFile.toURI().toString();
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ @After
+ public void after() throws Exception{
+ compareKeyValueParisWithDelta(expected, resultPath, " ", 0.01);
+ }
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
+ @Test
+ public void testPageRankWithSmallNumberOfIterations() throws Exception {
+ PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
+ expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
}
-
- public String runProgram(int progId) throws Exception {
-
- switch(progId) {
- case 1: {
- PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
- return PageRankData.RANKS_AFTER_3_ITERATIONS;
- }
- case 2: {
- // start with a very high number of iteration such that the dynamic convergence criterion must handle termination
- PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
- return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
- }
-
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
+ @Test
+ public void testPageRankWithConvergence() throws Exception {
+ // start with a very high number of iteration such that the dynamic convergence criterion must handle termination
+ PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+ expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 674ca49..aae7168 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -18,12 +18,10 @@
package org.apache.flink.test.iterative.aggregators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
import java.util.Random;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
import org.junit.Assert;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -33,12 +31,14 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -49,213 +49,185 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
*
*/
@RunWith(Parameterized.class)
-public class AggregatorsITCase extends JavaProgramTestBase {
+public class AggregatorsITCase extends MultipleProgramsTestBase {
- private static final int NUM_PROGRAMS = 5;
- private static final int MAX_ITERATIONS = 20;
+ private static final int MAX_ITERATIONS = 20;
private static final int DOP = 2;
+ private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
+
+ public AggregatorsITCase(ExecutionMode mode){
+ super(mode);
+ }
- private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
- private String expectedResult;
+ private String expected;
- public AggregatorsITCase(Configuration config) {
- super(config);
- }
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = AggregatorProgs.runProgram(curProgId, resultPath);
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void postSubmit() throws Exception {
+ @Test
+ public void testAggregatorWithoutParameterForIterate() throws Exception {
+ /*
+ * Test aggregator without parameter for iterate
+ */
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DOP);
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+ DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
+ IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ // register aggregator
+ LongSumAggregator aggr = new LongSumAggregator();
+ iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
+ // register convergence criterion
+ iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+ new NegativeElementsConvergenceCriterion());
+
+ DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
+ iteration.closeWith(updatedDs).writeAsText(resultPath);
+ env.execute();
- return toParameterList(tConfigs);
+ expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+ + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+ + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
}
- private static class AggregatorProgs {
+ @Test
+ public void testAggregatorWithParameterForIterate() throws Exception {
+ /*
+ * Test aggregator with parameter for iterate
+ */
- private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DOP);
- public static String runProgram(int progId, String resultPath) throws Exception {
+ DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
+ IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
- switch(progId) {
- case 1: {
- /*
- * Test aggregator without parameter for iterate
- */
+ // register aggregator
+ LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(0);
+ iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
-
- DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
- IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
-
- // register aggregator
- LongSumAggregator aggr = new LongSumAggregator();
- iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-
- // register convergence criterion
- iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
- new NegativeElementsConvergenceCriterion());
-
- DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
- }
- case 2: {
- /*
- * Test aggregator with parameter for iterate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
-
- DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
- IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
-
- // register aggregator
- LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(0);
- iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-
- // register convergence criterion
- iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
- new NegativeElementsConvergenceCriterion());
-
- DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
- }
- case 3: {
- /*
+ // register convergence criterion
+ iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+ new NegativeElementsConvergenceCriterion());
+
+ DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
+ iteration.closeWith(updatedDs).writeAsText(resultPath);
+ env.execute();
+
+ expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+ + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+ + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ }
+
+ @Test
+ public void testConvergenceCriterionWithParameterForIterate() throws Exception {
+ /*
* Test convergence criterion with parameter for iterate
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
-
- DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
- IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
-
- // register aggregator
- LongSumAggregator aggr = new LongSumAggregator();
- iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-
- // register convergence criterion
- iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
- new NegativeElementsConvergenceCriterionWithParam(3));
-
- DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
- }
- case 4: {
- /*
- * Test aggregator without parameter for iterateDelta
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
-
- DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
-
- DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
- initialSolutionSet, MAX_ITERATIONS, 0);
-
- // register aggregator
- LongSumAggregator aggr = new LongSumAggregator();
- iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-
- DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta());
-
- DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet())
- .where(0).equalTo(0).flatMap(new UpdateFilter());
-
- DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
- DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
- result.writeAsText(resultPath);
-
- env.execute();
-
- // return expected result
- return "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
- + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
- + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
-
- }
- case 5: {
- /*
- * Test aggregator with parameter for iterateDelta
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DOP);
-
- DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
-
- DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
- initialSolutionSet, MAX_ITERATIONS, 0);
-
- // register aggregator
- LongSumAggregator aggr = new LongSumAggregatorWithParameter(4);
- iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
-
- DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta());
-
- DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet())
- .where(0).equalTo(0).flatMap(new UpdateFilter());
-
- DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
- DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
- result.writeAsText(resultPath);
-
- env.execute();
-
- // return expected result
- return "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
- + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
- + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
- }
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DOP);
+
+ DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
+ IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS);
+
+ // register aggregator
+ LongSumAggregator aggr = new LongSumAggregator();
+ iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+ // register convergence criterion
+ iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr,
+ new NegativeElementsConvergenceCriterionWithParam(3));
+
+ DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
+ iteration.closeWith(updatedDs).writeAsText(resultPath);
+ env.execute();
+
+ expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+ + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+ + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ }
+
+ @Test
+ public void testAggregatorWithoutParameterForIterateDelta() throws Exception {
+ /*
+ * Test aggregator without parameter for iterateDelta
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DOP);
+
+ DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
+
+ DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
+ initialSolutionSet, MAX_ITERATIONS, 0);
+
+ // register aggregator
+ LongSumAggregator aggr = new LongSumAggregator();
+ iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+ DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta());
+
+ DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet())
+ .where(0).equalTo(0).flatMap(new UpdateFilter());
+
+ DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
+ DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
+ result.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
+ + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
+ + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+ }
+
+ @Test
+ public void testAggregatorWithParameterForIterateDelta() throws Exception {
+ /*
+ * Test aggregator with parameter for iterateDelta
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DOP);
+
+ DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap());
+
+ DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta(
+ initialSolutionSet, MAX_ITERATIONS, 0);
+
+ // register aggregator
+ LongSumAggregator aggr = new LongSumAggregatorWithParameter(4);
+ iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr);
+
+ DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta());
+
+ DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet())
+ .where(0).equalTo(0).flatMap(new UpdateFilter());
+
+ DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
+ DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
+ result.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
+ + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
+ + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
}
@SuppressWarnings("serial")
@@ -294,7 +266,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
@Override
public void open(Configuration conf) {
- aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+ aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
}
@Override
@@ -316,7 +288,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
@Override
public void open(Configuration conf) {
- aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+ aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
}
@Override
@@ -366,11 +338,11 @@ public class AggregatorsITCase extends JavaProgramTestBase {
@Override
public void open(Configuration conf) {
- aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+ aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
superstep = getIterationRuntimeContext().getSuperstepNumber();
if (superstep > 1) {
- previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+ previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(NEGATIVE_ELEMENTS_AGGR);
// check previous aggregator value
Assert.assertEquals(superstep - 1, previousAggr.getValue());
}
@@ -429,11 +401,11 @@ public class AggregatorsITCase extends JavaProgramTestBase {
@Override
public void open(Configuration conf) {
- aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+ aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
superstep = getIterationRuntimeContext().getSuperstepNumber();
if (superstep > 1) {
- previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR);
+ previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(NEGATIVE_ELEMENTS_AGGR);
// check previous aggregator value
switch(superstep) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
index 61ad863..3fbcae6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
@@ -18,135 +18,107 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
-public class AggregateITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 3;
-
- private int curProgId = config.getInteger("ProgramId", -1);
- private String resultPath;
- private String expectedResult;
-
- public AggregateITCase(Configuration config) {
- super(config);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
+public class AggregateITCase extends MultipleProgramsTestBase {
- @Override
- protected void testProgram() throws Exception {
- expectedResult = AggregateProgs.runProgram(curProgId, resultPath);
+
+ public AggregateITCase(ExecutionMode mode){
+ super(mode);
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ private String resultPath;
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
-
- private static class AggregateProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
+
+ @Test
+ public void testFullAggregate() throws Exception {
+ /*
* Full Aggregate
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Integer, Long>> aggregateDs = ds
- .aggregate(Aggregations.SUM, 0)
- .and(Aggregations.MAX, 1)
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> aggregateDs = ds
+ .aggregate(Aggregations.SUM, 0)
+ .and(Aggregations.MAX, 1)
.project(0, 1);
-
- aggregateDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "231,6\n";
- }
- case 2: {
- /*
+
+ aggregateDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "231,6\n";
+ }
+
+ @Test
+ public void testGroupedAggregate() throws Exception {
+ /*
* Grouped Aggregate
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
- .aggregate(Aggregations.SUM, 0)
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+ .aggregate(Aggregations.SUM, 0)
.project(1, 0);
-
- aggregateDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1\n" +
+
+ aggregateDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1\n" +
"2,5\n" +
"3,15\n" +
"4,34\n" +
"5,65\n" +
"6,111\n";
- }
- case 3: {
- /*
- * Nested Aggregate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
- .aggregate(Aggregations.MIN, 0)
- .aggregate(Aggregations.MIN, 0)
+ }
+
+ @Test
+ public void testNestedAggregate() throws Exception {
+ /*
+ * Nested Aggregate
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+ .aggregate(Aggregations.MIN, 0)
+ .aggregate(Aggregations.MIN, 0)
.project(0);
-
- aggregateDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1\n";
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
- }
+
+ aggregateDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1\n";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index ffc208c..b249e22 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,11 +18,8 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -39,478 +36,486 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
-public class CoGroupITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 13;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class CoGroupITCase extends MultipleProgramsTestBase {
+
+ public CoGroupITCase(ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public CoGroupITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = CoGroupProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
+ /*
+ * CoGroup on tuples with key field selector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
+
+ coGroupDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,0\n" +
+ "2,6\n" +
+ "3,24\n" +
+ "4,60\n" +
+ "5,120\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ @Test
+ public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
+ /*
+ * CoGroup on two custom type inputs with key extractors
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new
+ KeySelector5()).with(new CustomTypeCoGroup());
+
+ coGroupDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "1,0,test\n" +
+ "2,6,test\n" +
+ "3,24,test\n" +
+ "4,60,test\n" +
+ "5,120,test\n" +
+ "6,210,test\n";
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ public static class KeySelector4 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
+ public static class KeySelector5 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
}
-
- return toParameterList(tConfigs);
}
-
- private static class CoGroupProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
-
- /*
- * CoGroup on tuples with key field selector
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
-
- coGroupDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,0\n" +
- "2,6\n" +
- "3,24\n" +
- "4,60\n" +
- "5,120\n";
- }
- case 2: {
-
- /*
- * CoGroup on two custom type inputs with key extractors
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector<CustomType, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- }).equalTo(new KeySelector<CustomType, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- }).with(new CustomTypeCoGroup());
-
- coGroupDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,test\n" +
- "2,6,test\n" +
- "3,24,test\n" +
- "4,60,test\n" +
- "5,120,test\n" +
- "6,210,test\n";
- }
- case 3: {
-
- /*
- * check correctness of cogroup if UDF returns left input objects
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
-
- coGroupDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" +
- "5,3,I am fine.\n";
-
- }
- case 4: {
-
- /*
- * check correctness of cogroup if UDF returns right input objects
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
-
- coGroupDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,0,Hallo,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "3,4,3,Hallo Welt wie gehts?,2\n" +
- "3,5,4,ABC,2\n" +
- "3,6,5,BCD,3\n";
-
- }
- case 5: {
-
- /*
- * Reduce with broadcast set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
-
- coGroupDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,55\n" +
- "2,6,55\n" +
- "3,24,55\n" +
- "4,60,55\n" +
- "5,120,55\n";
- }
- case 6: {
-
- /*
- * CoGroup on a tuple input with key field selector and a custom type input with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new KeySelector<CustomType, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- }).with(new MixedCoGroup());
-
- coGroupDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "0,1,test\n" +
- "1,2,test\n" +
- "2,5,test\n" +
- "3,15,test\n" +
- "4,33,test\n" +
- "5,63,test\n" +
- "6,109,test\n" +
- "7,4,test\n" +
- "8,4,test\n" +
- "9,4,test\n" +
- "10,5,test\n" +
- "11,5,test\n" +
- "12,5,test\n" +
- "13,5,test\n" +
- "14,5,test\n";
-
- }
- case 7: {
-
- /*
- * CoGroup on a tuple input with key field selector and a custom type input with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector<CustomType, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- }).equalTo(2).with(new MixedCoGroup2());
-
- coGroupDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "0,1,test\n" +
- "1,2,test\n" +
- "2,5,test\n" +
- "3,15,test\n" +
- "4,33,test\n" +
- "5,63,test\n" +
- "6,109,test\n" +
- "7,4,test\n" +
- "8,4,test\n" +
- "9,4,test\n" +
- "10,5,test\n" +
- "11,5,test\n" +
- "12,5,test\n" +
- "13,5,test\n" +
- "14,5,test\n";
-
- }
- case 8: {
- /*
- * CoGroup with multiple key fields
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
- where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
-
- coGrouped.writeAsCsv(resultPath);
- env.execute();
-
- return "1,1,Hallo\n" +
- "2,2,Hallo Welt\n" +
- "3,2,Hallo Welt wie gehts?\n" +
- "3,2,ABC\n" +
- "5,3,HIJ\n" +
- "5,3,IJK\n";
- }
- case 9: {
- /*
- * CoGroup with multiple key fields
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
- where(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
- return new Tuple2<Integer, Long>(t.f0, t.f4);
- }
- }).
- equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
- return new Tuple2<Integer, Long>(t.f0, t.f1);
- }
- }).with(new Tuple5Tuple3CoGroup());
-
- coGrouped.writeAsCsv(resultPath);
- env.execute();
-
- return "1,1,Hallo\n" +
- "2,2,Hallo Welt\n" +
- "3,2,Hallo Welt wie gehts?\n" +
- "3,2,ABC\n" +
- "5,3,HIJ\n" +
- "5,3,IJK\n";
- }
- case 10: {
- /*
- * CoGroup on two custom type inputs using expression keys
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
-
- coGroupDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,test\n" +
- "2,6,test\n" +
- "3,24,test\n" +
- "4,60,test\n" +
- "5,120,test\n" +
- "6,210,test\n";
- }
- case 11: {
- /*
- * CoGroup on two custom type inputs using expression keys
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
- DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
- .where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(
- Iterable<POJO> first,
- Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
- Collector<CustomType> out) throws Exception {
- for(POJO p : first) {
- for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
- Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
- out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
- }
- }
- }
- });
- coGroupDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "-1,20000,Flink\n" +
- "-1,10000,Flink\n" +
- "-1,30000,Flink\n";
- }
- case 12: {
- /*
- * CoGroup field-selector (expression keys) + key selector function
- * The key selector is unnecessary complicated (Tuple1) ;)
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
- DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
- .where(new KeySelector<POJO, Tuple1<Long>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple1<Long> getKey(POJO value)
- throws Exception {
- return new Tuple1<Long>(value.nestedPojo.longNumber);
- }
- }).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(
- Iterable<POJO> first,
- Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
- Collector<CustomType> out) throws Exception {
- for(POJO p : first) {
- for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
- Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
- out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
- }
- }
- }
- });
- coGroupDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "-1,20000,Flink\n" +
- "-1,10000,Flink\n" +
- "-1,30000,Flink\n";
+
+ @Test
+ public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception {
+ /*
+ * check correctness of cogroup if UDF returns left input objects
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
+
+ coGroupDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" +
+ "5,3,I am fine.\n";
+ }
+
+ @Test
+ public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception {
+ /*
+ * check correctness of cogroup if UDF returns right input objects
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
+
+ coGroupDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,0,Hallo,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "3,4,3,Hallo Welt wie gehts?,2\n" +
+ "3,5,4,ABC,2\n" +
+ "3,6,5,BCD,3\n";
+ }
+
+ @Test
+ public void testCoGroupWithBroadcastSet() throws Exception {
+ /*
+ * Reduce with broadcast set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
+
+ coGroupDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,0,55\n" +
+ "2,6,55\n" +
+ "3,24,55\n" +
+ "4,60,55\n" +
+ "5,120,55\n";
+ }
+
+ @Test
+ public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+ throws Exception {
+ /*
+ * CoGroup on a tuple input with key field selector and a custom type input with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new
+ KeySelector2()).with(new MixedCoGroup());
+
+ coGroupDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "0,1,test\n" +
+ "1,2,test\n" +
+ "2,5,test\n" +
+ "3,15,test\n" +
+ "4,33,test\n" +
+ "5,63,test\n" +
+ "6,109,test\n" +
+ "7,4,test\n" +
+ "8,4,test\n" +
+ "9,4,test\n" +
+ "10,5,test\n" +
+ "11,5,test\n" +
+ "12,5,test\n" +
+ "13,5,test\n" +
+ "14,5,test\n";
+ }
+
+ public static class KeySelector2 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }
+
+ @Test
+ public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector()
+ throws Exception {
+ /*
+ * CoGroup on a tuple input with key field selector and a custom type input with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector3()).equalTo(2).with
+ (new MixedCoGroup2());
+
+ coGroupDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "0,1,test\n" +
+ "1,2,test\n" +
+ "2,5,test\n" +
+ "3,15,test\n" +
+ "4,33,test\n" +
+ "5,63,test\n" +
+ "6,109,test\n" +
+ "7,4,test\n" +
+ "8,4,test\n" +
+ "9,4,test\n" +
+ "10,5,test\n" +
+ "11,5,test\n" +
+ "12,5,test\n" +
+ "13,5,test\n" +
+ "14,5,test\n";
+
+ }
+
+ public static class KeySelector3 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }
+
+ @Test
+ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception {
+ /*
+ * CoGroup with multiple key fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
+
+ coGrouped.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+ }
+
+ @Test
+ public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception {
+ /*
+ * CoGroup with multiple key fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(new KeySelector7()).
+ equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup());
+
+ coGrouped.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+ }
+
+ public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
+ Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }
+
+ public static class KeySelector8 implements KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ }
+
+ @Test
+ public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception {
+ /*
+ * CoGroup on two custom type inputs using expression keys
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
+
+ coGroupDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "1,0,test\n" +
+ "2,6,test\n" +
+ "3,24,test\n" +
+ "4,60,test\n" +
+ "5,120,test\n" +
+ "6,210,test\n";
+ }
+
+ @Test
+ public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws
+ Exception {
+ /*
+ * CoGroup on two custom type inputs using expression keys
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+ .where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1());
+ coGroupDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "-1,20000,Flink\n" +
+ "-1,10000,Flink\n" +
+ "-1,30000,Flink\n";
+ }
+
+ public static class CoGroup1 implements CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<POJO> first,
+ Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+ Collector<CustomType> out) throws Exception {
+ for(POJO p : first) {
+ for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+ }
}
- case 13: {
- /*
- * CoGroup field-selector (expression keys) + key selector function
- * The key selector is simple here
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
- DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
- .where(new KeySelector<POJO, Long>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long getKey(POJO value)
- throws Exception {
- return value.nestedPojo.longNumber;
- }
- }).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(
- Iterable<POJO> first,
- Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
- Collector<CustomType> out) throws Exception {
- for(POJO p : first) {
- for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
- Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
- out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
- }
- }
- }
- });
- coGroupDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "-1,20000,Flink\n" +
- "-1,10000,Flink\n" +
- "-1,30000,Flink\n";
+ }
+ }
+
+ @Test
+ public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception {
+ /*
+ * CoGroup field-selector (expression keys) + key selector function
+ * The key selector is unnecessary complicated (Tuple1) ;)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+ .where(new KeySelector6()).equalTo(6).with(new CoGroup3());
+ coGroupDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "-1,20000,Flink\n" +
+ "-1,10000,Flink\n" +
+ "-1,30000,Flink\n";
+
+ }
+
+ public static class KeySelector6 implements KeySelector<POJO, Tuple1<Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple1<Long> getKey(POJO value)
+ throws Exception {
+ return new Tuple1<Long>(value.nestedPojo.longNumber);
+ }
+ }
+
+ public static class CoGroup3 implements CoGroupFunction<POJO, Tuple7<Integer,
+ String, Integer, Integer, Long, String, Long>, CustomType> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<POJO> first,
+ Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+ Collector<CustomType> out) throws Exception {
+ for(POJO p : first) {
+ for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+ }
}
-
- default:
- throw new IllegalArgumentException("Invalid program id");
+ }
+ }
+
+ @Test
+ public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
+ /*
+ * CoGroup field-selector (expression keys) + key selector function
+ * The key selector is simple here
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+ .where(new KeySelector1()).equalTo(6).with(new CoGroup2());
+ coGroupDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "-1,20000,Flink\n" +
+ "-1,10000,Flink\n" +
+ "-1,30000,Flink\n";
+ }
+
+ public static class KeySelector1 implements KeySelector<POJO, Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long getKey(POJO value)
+ throws Exception {
+ return value.nestedPojo.longNumber;
+ }
+ }
+
+ public static class CoGroup2 implements CoGroupFunction<POJO, Tuple7<Integer, String,
+ Integer, Integer, Long, String, Long>, CustomType> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<POJO> first,
+ Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+ Collector<CustomType> out) throws Exception {
+ for(POJO p : first) {
+ for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+ }
}
-
}
-
}
-
+
public static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
index 7d79ea5..bd32bfc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
@@ -18,10 +18,7 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.Collection;
-import java.util.LinkedList;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.RichCrossFunction;
@@ -32,371 +29,336 @@ import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
-public class CrossITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 11;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class CrossITCase extends MultipleProgramsTestBase {
+
+ public CrossITCase(ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public CrossITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = CrossProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
+ /*
+ * check correctness of cross on two tuple inputs
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross());
+
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "0,HalloHallo\n" +
+ "1,HalloHallo Welt\n" +
+ "2,HalloHallo Welt wie\n" +
+ "1,Hallo WeltHallo\n" +
+ "2,Hallo WeltHallo Welt\n" +
+ "3,Hallo WeltHallo Welt wie\n" +
+ "2,Hallo Welt wieHallo\n" +
+ "3,Hallo Welt wieHallo Welt\n" +
+ "4,Hallo Welt wieHallo Welt wie\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ @Test
+ public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws Exception {
+ /*
+ * check correctness of cross if UDF returns left input object
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft());
+
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "1,1,Hi\n" +
+ "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "2,2,Hello\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n" +
+ "3,2,Hello world\n" +
+ "3,2,Hello world\n";
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ @Test
+ public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws Exception {
+ /*
+ * check correctness of cross if UDF returns right input object
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight());
+
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,0,Hallo,1\n" +
+ "1,1,0,Hallo,1\n" +
+ "1,1,0,Hallo,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,3,2,Hallo Welt wie,1\n";
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
}
-
- private static class CrossProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
-
- /*
- * check correctness of cross on two tuple inputs
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross());
-
- crossDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "0,HalloHallo\n" +
- "1,HalloHallo Welt\n" +
- "2,HalloHallo Welt wie\n" +
- "1,Hallo WeltHallo\n" +
- "2,Hallo WeltHallo Welt\n" +
- "3,Hallo WeltHallo Welt wie\n" +
- "2,Hallo Welt wieHallo\n" +
- "3,Hallo Welt wieHallo Welt\n" +
- "4,Hallo Welt wieHallo Welt wie\n";
- }
- case 2: {
-
- /*
- * check correctness of cross if UDF returns left input object
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft());
-
- crossDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "1,1,Hi\n" +
- "1,1,Hi\n" +
- "2,2,Hello\n" +
- "2,2,Hello\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n" +
- "3,2,Hello world\n" +
- "3,2,Hello world\n";
-
- }
- case 3: {
-
- /*
- * check correctness of cross if UDF returns right input object
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight());
-
- crossDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,0,Hallo,1\n" +
- "1,1,0,Hallo,1\n" +
- "1,1,0,Hallo,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "2,2,1,Hallo Welt,2\n" +
- "2,2,1,Hallo Welt,2\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,3,2,Hallo Welt wie,1\n";
-
- }
- case 4: {
-
- /*
- * check correctness of cross with broadcast set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
-
- crossDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "2,0,55\n" +
- "3,0,55\n" +
- "3,0,55\n" +
- "3,0,55\n" +
- "4,1,55\n" +
- "4,2,55\n" +
- "3,0,55\n" +
- "4,2,55\n" +
- "4,4,55\n";
- }
- case 5: {
-
- /*
- * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross)
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross());
-
- crossDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "0,HalloHallo\n" +
- "1,HalloHallo Welt\n" +
- "2,HalloHallo Welt wie\n" +
- "1,Hallo WeltHallo\n" +
- "2,Hallo WeltHallo Welt\n" +
- "3,Hallo WeltHallo Welt wie\n" +
- "2,Hallo Welt wieHallo\n" +
- "3,Hallo Welt wieHallo Welt\n" +
- "4,Hallo Welt wieHallo Welt wie\n";
-
- }
- case 6: {
-
- /*
- * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());
-
- crossDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "0,HalloHallo\n" +
- "1,HalloHallo Welt\n" +
- "2,HalloHallo Welt wie\n" +
- "1,Hallo WeltHallo\n" +
- "2,Hallo WeltHallo Welt\n" +
- "3,Hallo WeltHallo Welt wie\n" +
- "2,Hallo Welt wieHallo\n" +
- "3,Hallo Welt wieHallo Welt\n" +
- "4,Hallo Welt wieHallo Welt wie\n";
-
- }
- case 7: {
- /*
- * project cross on a tuple input 1
- */
+ @Test
+ public void testCorrectnessOfCrossWithBroadcastSet() throws Exception {
+ /*
+ * check correctness of cross with broadcast set
+ */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2)
- .projectFirst(2, 1)
- .projectSecond(3)
- .projectFirst(0)
- .projectSecond(4,1);
+ DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
- crossDs.writeAsCsv(resultPath);
- env.execute();
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
- // return expected result
- return "Hi,1,Hallo,1,1,1\n" +
- "Hi,1,Hallo Welt,1,2,2\n" +
- "Hi,1,Hallo Welt wie,1,1,3\n" +
- "Hello,2,Hallo,2,1,1\n" +
- "Hello,2,Hallo Welt,2,2,2\n" +
- "Hello,2,Hallo Welt wie,2,1,3\n" +
- "Hello world,2,Hallo,3,1,1\n" +
- "Hello world,2,Hallo Welt,3,2,2\n" +
- "Hello world,2,Hallo Welt wie,3,1,3\n";
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
- }
- case 8: {
+ expected = "2,0,55\n" +
+ "3,0,55\n" +
+ "3,0,55\n" +
+ "3,0,55\n" +
+ "4,1,55\n" +
+ "4,2,55\n" +
+ "3,0,55\n" +
+ "4,2,55\n" +
+ "4,4,55\n";
+ }
+
+ @Test
+ public void testCorrectnessOfCrossWithHuge() throws Exception {
+ /*
+ * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross());
+
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "0,HalloHallo\n" +
+ "1,HalloHallo Welt\n" +
+ "2,HalloHallo Welt wie\n" +
+ "1,Hallo WeltHallo\n" +
+ "2,Hallo WeltHallo Welt\n" +
+ "3,Hallo WeltHallo Welt wie\n" +
+ "2,Hallo Welt wieHallo\n" +
+ "3,Hallo Welt wieHallo Welt\n" +
+ "4,Hallo Welt wieHallo Welt wie\n";
+ }
+
+ @Test
+ public void testCorrectnessOfCrossWithTiny() throws Exception {
+ /*
+ * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());
+
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "0,HalloHallo\n" +
+ "1,HalloHallo Welt\n" +
+ "2,HalloHallo Welt wie\n" +
+ "1,Hallo WeltHallo\n" +
+ "2,Hallo WeltHallo Welt\n" +
+ "3,Hallo WeltHallo Welt wie\n" +
+ "2,Hallo Welt wieHallo\n" +
+ "3,Hallo Welt wieHallo Welt\n" +
+ "4,Hallo Welt wieHallo Welt wie\n";
+ }
- /*
- * project cross on a tuple input 2
- */
+ @Test
+ public void testProjectCrossOnATupleInput1() throws Exception{
+ /*
+ * project cross on a tuple input 1
+ */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple6<String, String, Long, Long, Long,Integer>> crossDs = ds.cross(ds2)
- .projectSecond(3)
- .projectFirst(2, 1)
- .projectSecond(4,1)
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2)
+ .projectFirst(2, 1)
+ .projectSecond(3)
+ .projectFirst(0)
+ .projectSecond(4,1);
+
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,1,Hallo,1,1,1\n" +
+ "Hi,1,Hallo Welt,1,2,2\n" +
+ "Hi,1,Hallo Welt wie,1,1,3\n" +
+ "Hello,2,Hallo,2,1,1\n" +
+ "Hello,2,Hallo Welt,2,2,2\n" +
+ "Hello,2,Hallo Welt wie,2,1,3\n" +
+ "Hello world,2,Hallo,3,1,1\n" +
+ "Hello world,2,Hallo Welt,3,2,2\n" +
+ "Hello world,2,Hallo Welt wie,3,1,3\n";
+ }
+
+ @Test
+ public void testProjectCrossOnATupleInput2() throws Exception {
+ /*
+ * project cross on a tuple input 2
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple6<String, String, Long, Long, Long,Integer>> crossDs = ds.cross(ds2)
+ .projectSecond(3)
+ .projectFirst(2, 1)
+ .projectSecond(4,1)
.projectFirst(0);
- crossDs.writeAsCsv(resultPath);
- env.execute();
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
- // return expected result
- return "Hallo,Hi,1,1,1,1\n" +
- "Hallo Welt,Hi,1,2,2,1\n" +
- "Hallo Welt wie,Hi,1,1,3,1\n" +
- "Hallo,Hello,2,1,1,2\n" +
- "Hallo Welt,Hello,2,2,2,2\n" +
- "Hallo Welt wie,Hello,2,1,3,2\n" +
- "Hallo,Hello world,2,1,1,3\n" +
- "Hallo Welt,Hello world,2,2,2,3\n" +
- "Hallo Welt wie,Hello world,2,1,3,3\n";
+ expected = "Hallo,Hi,1,1,1,1\n" +
+ "Hallo Welt,Hi,1,2,2,1\n" +
+ "Hallo Welt wie,Hi,1,1,3,1\n" +
+ "Hallo,Hello,2,1,1,2\n" +
+ "Hallo Welt,Hello,2,2,2,2\n" +
+ "Hallo Welt wie,Hello,2,1,3,2\n" +
+ "Hallo,Hello world,2,1,1,3\n" +
+ "Hallo Welt,Hello world,2,2,2,3\n" +
+ "Hallo Welt wie,Hello world,2,1,3,3\n";
- }
- case 9: {
- /*
- * check correctness of default cross
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
-
- crossDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
- "(1,1,Hi),(1,1,0,Hallo,1)\n" +
- "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
- "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
- "(2,2,Hello),(1,1,0,Hallo,1)\n" +
- "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
- "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
- "(3,2,Hello world),(1,1,0,Hallo,1)\n" +
- "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
-
- }
+ }
- case 10: {
-
- /*
- * check correctness of cross on two custom type inputs
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env);
- DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
- DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross());
-
- crossDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,HiHi\n"
- + "2,1,HiHello\n"
- + "2,2,HiHello world\n"
- + "2,1,HelloHi\n"
- + "4,2,HelloHello\n"
- + "4,3,HelloHello world\n"
- + "2,2,Hello worldHi\n"
- + "4,3,Hello worldHello\n"
- + "4,4,Hello worldHello world";
- }
-
- case 11: {
-
- /*
- * check correctness of cross a tuple input and a custom type input
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross());
-
- crossDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "2,0,HalloHi\n" +
- "3,0,HalloHello\n" +
- "3,0,HalloHello world\n" +
- "3,0,Hallo WeltHi\n" +
- "4,1,Hallo WeltHello\n" +
- "4,2,Hallo WeltHello world\n" +
- "3,0,Hallo Welt wieHi\n" +
- "4,2,Hallo Welt wieHello\n" +
- "4,4,Hallo Welt wieHello world\n";
-
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
-
- }
-
+ @Test
+ public void testCorrectnessOfDefaultCross() throws Exception {
+ /*
+ * check correctness of default cross
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
+
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
+ "(1,1,Hi),(1,1,0,Hallo,1)\n" +
+ "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
+ "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
+ "(2,2,Hello),(1,1,0,Hallo,1)\n" +
+ "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
+ "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
+ "(3,2,Hello world),(1,1,0,Hallo,1)\n" +
+ "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
+ }
+
+ @Test
+ public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws Exception {
+ /*
+ * check correctness of cross on two custom type inputs
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+ DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross());
+
+ crossDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "1,0,HiHi\n"
+ + "2,1,HiHello\n"
+ + "2,2,HiHello world\n"
+ + "2,1,HelloHi\n"
+ + "4,2,HelloHello\n"
+ + "4,3,HelloHello world\n"
+ + "2,2,Hello worldHi\n"
+ + "4,3,Hello worldHello\n"
+ + "4,4,Hello worldHello world";
+ }
+
+ @Test
+ public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() throws Exception {
+ /*
+ * check correctness of cross a tuple input and a custom type input
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross());
+
+ crossDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "2,0,HalloHi\n" +
+ "3,0,HalloHello\n" +
+ "3,0,HalloHello world\n" +
+ "3,0,Hallo WeltHi\n" +
+ "4,1,Hallo WeltHello\n" +
+ "4,2,Hallo WeltHello world\n" +
+ "3,0,Hallo Welt wieHi\n" +
+ "4,2,Hallo Welt wieHello\n" +
+ "4,4,Hallo Welt wieHello world\n";
}
public static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {