You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/10 20:39:48 UTC

[8/8] flink git commit: [hotfix] [tests] Share proper test mini cluster for tests in DistributedCacheTest

[hotfix] [tests] Share proper test mini cluster for tests in DistributedCacheTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06216334
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06216334
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06216334

Branch: refs/heads/release-1.3
Commit: 062163344f8785733503837b83a4d063baa3c4cf
Parents: 1b21737
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 20:57:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 10 21:08:53 2017 +0200

----------------------------------------------------------------------
 .../distributedCache/DistributedCacheTest.java  | 56 ++++++++++++++++----
 1 file changed, 46 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06216334/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
index 19bcf76..21aa40a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
@@ -22,19 +22,30 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
-import java.io.*;
+import static org.junit.Assert.assertTrue;
 
-import java.util.*;
 
+public class DistributedCacheTest extends AbstractTestBase {
 
-public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
 	public static final String data
 			= "machen\n"
 			+ "zeit\n"
@@ -42,6 +53,31 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
 			+ "keiner\n"
 			+ "meine\n";
 
+	private static final int PARALLELISM = 4;
+
+	private static LocalFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true);
+		TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
+		TestEnvironment.setAsContext(cluster, PARALLELISM);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		TestStreamEnvironment.unsetAsContext();
+		TestEnvironment.unsetAsContext();
+		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public DistributedCacheTest() {
+		super(new Configuration());
+	}
+	
+	// ------------------------------------------------------------------------
 
 	@Test
 	public void testStreamingDistributedCache() throws Exception {
@@ -68,12 +104,12 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
 		@Override
 		public void open(Configuration conf) throws IOException {
 			File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
-			BufferedReader reader = new BufferedReader(new FileReader(file));
-			String tempString;
-			while ((tempString = reader.readLine()) != null) {
-				wordList.add(tempString);
+			try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
+				String tempString;
+				while ((tempString= reader.readLine()) != null) {
+					wordList.add(tempString);
+				}
 			}
-			reader.close();
 		}
 
 		@Override