You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/13 12:39:46 UTC
[2/6] flink git commit: [FLINK-9503] Migrate integration tests for
iterative aggregators
[FLINK-9503] Migrate integration tests for iterative aggregators
This closes #6129.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d783c628
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d783c628
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d783c628
Branch: refs/heads/master
Commit: d783c6282e7362fae29b35e61b1522912eab2c36
Parents: fc49801
Author: yanghua <ya...@gmail.com>
Authored: Thu Jun 7 00:23:25 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200
----------------------------------------------------------------------
.../aggregators/AggregatorsITCase.java | 95 ++++++++------------
1 file changed, 39 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d783c628/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index bd42ac2..e449ab8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -35,10 +35,8 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
@@ -48,6 +46,9 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -63,35 +64,24 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
private static final int parallelism = 2;
private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
- private static String testString = "Et tu, Brute?";
- private static String testName = "testing_caesar";
- private static String testPath;
-
public AggregatorsITCase(TestExecutionMode mode){
super(mode);
}
- private String resultPath;
- private String expected;
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
+ @Test
+ public void testDistributedCacheWithIterations() throws Exception{
+ final String testString = "Et tu, Brute?";
+ final String testName = "testing_caesar";
- @Before
- public void before() throws Exception{
final File folder = tempFolder.newFolder();
final File resultFile = new File(folder, UUID.randomUUID().toString());
- testPath = resultFile.toString();
- resultPath = resultFile.toURI().toString();
- }
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
+ String testPath = resultFile.toString();
+ String resultPath = resultFile.toURI().toString();
- @Test
- public void testDistributedCacheWithIterations() throws Exception{
File tempFile = new File(testPath);
try (FileWriter writer = new FileWriter(tempFile)) {
writer.write(testString);
@@ -117,7 +107,6 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
}).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat<Long>());
env.execute();
- expected = testString; // this will be a useless verification now.
}
@Test
@@ -141,12 +130,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
new NegativeElementsConvergenceCriterion());
DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
+ List<Integer> result = iteration.closeWith(updatedDs).collect();
+ Collections.sort(result);
- expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+ assertEquals(expected, result);
}
@Test
@@ -170,12 +159,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
new NegativeElementsConvergenceCriterion());
DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
+ List<Integer> result = iteration.closeWith(updatedDs).collect();
+ Collections.sort(result);
- expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+ assertEquals(expected, result);
}
@Test
@@ -199,12 +188,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
new NegativeElementsConvergenceCriterionWithParam(3));
DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
+ List<Integer> result = iteration.closeWith(updatedDs).collect();
+ Collections.sort(result);
- expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+ assertEquals(expected, result);
}
@Test
@@ -231,14 +220,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
.where(0).equalTo(0).flatMap(new UpdateFilter());
DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
- DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
- result.writeAsText(resultPath);
+ List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+ Collections.sort(result);
- env.execute();
+ List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
- expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
- + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
- + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+ assertEquals(expected, result);
}
@Test
@@ -265,14 +252,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
.where(0).equalTo(0).flatMap(new UpdateFilter());
DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
- DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
- result.writeAsText(resultPath);
+ List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+ Collections.sort(result);
- env.execute();
+ List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
- expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
- + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
- + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+ assertEquals(result, expected);
}
@Test
@@ -303,14 +288,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
.where(0).equalTo(0).projectFirst(0, 1);
DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
- DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
- result.writeAsText(resultPath);
+ List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+ Collections.sort(result);
- env.execute();
+ List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
- expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ assertEquals(expected, result);
}
@SuppressWarnings("serial")