You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/13 12:39:46 UTC

[2/6] flink git commit: [FLINK-9503] Migrate integration tests for iterative aggregators

[FLINK-9503] Migrate integration tests for iterative aggregators

This closes #6129.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d783c628
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d783c628
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d783c628

Branch: refs/heads/master
Commit: d783c6282e7362fae29b35e61b1522912eab2c36
Parents: fc49801
Author: yanghua <ya...@gmail.com>
Authored: Thu Jun 7 00:23:25 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200

----------------------------------------------------------------------
 .../aggregators/AggregatorsITCase.java          | 95 ++++++++------------
 1 file changed, 39 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d783c628/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index bd42ac2..e449ab8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -35,10 +35,8 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -48,6 +46,9 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
@@ -63,35 +64,24 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	private static final int parallelism = 2;
 	private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
 
-	private static String testString = "Et tu, Brute?";
-	private static String testName = "testing_caesar";
-	private static String testPath;
-
 	public AggregatorsITCase(TestExecutionMode mode){
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
+	@Test
+	public void testDistributedCacheWithIterations() throws Exception{
+		final String testString = "Et tu, Brute?";
+		final String testName = "testing_caesar";
 
-	@Before
-	public void before() throws Exception{
 		final File folder = tempFolder.newFolder();
 		final File resultFile = new File(folder, UUID.randomUUID().toString());
-		testPath = resultFile.toString();
-		resultPath = resultFile.toURI().toString();
-	}
 
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
+		String testPath = resultFile.toString();
+		String resultPath = resultFile.toURI().toString();
 
-	@Test
-	public void testDistributedCacheWithIterations() throws Exception{
 		File tempFile = new File(testPath);
 		try (FileWriter writer = new FileWriter(tempFile)) {
 			writer.write(testString);
@@ -117,7 +107,6 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 			}
 		}).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat<Long>());
 		env.execute();
-		expected = testString; // this will be a useless verification now.
 	}
 
 	@Test
@@ -141,12 +130,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				new NegativeElementsConvergenceCriterion());
 
 		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
-		iteration.closeWith(updatedDs).writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = iteration.closeWith(updatedDs).collect();
+		Collections.sort(result);
 
-		expected =  "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+		List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -170,12 +159,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				new NegativeElementsConvergenceCriterion());
 
 		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
-		iteration.closeWith(updatedDs).writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = iteration.closeWith(updatedDs).collect();
+		Collections.sort(result);
 
-		expected =  "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+		List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -199,12 +188,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				new NegativeElementsConvergenceCriterionWithParam(3));
 
 		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
-		iteration.closeWith(updatedDs).writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = iteration.closeWith(updatedDs).collect();
+		Collections.sort(result);
 
-		expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+		List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -231,14 +220,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				.where(0).equalTo(0).flatMap(new UpdateFilter());
 
 		DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
-		DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
-		result.writeAsText(resultPath);
+		List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+		Collections.sort(result);
 
-		env.execute();
+		List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
 
-		expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
-				+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
-				+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -265,14 +252,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				.where(0).equalTo(0).flatMap(new UpdateFilter());
 
 		DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
-		DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
-		result.writeAsText(resultPath);
+		List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+		Collections.sort(result);
 
-		env.execute();
+		List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
 
-		expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
-				+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
-				+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+		assertEquals(result, expected);
 	}
 
 	@Test
@@ -303,14 +288,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				.where(0).equalTo(0).projectFirst(0, 1);
 
 		DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
-		DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
-		result.writeAsText(resultPath);
+		List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+		Collections.sort(result);
 
-		env.execute();
+		List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
 
-		expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+		assertEquals(expected, result);
 	}
 
 	@SuppressWarnings("serial")