You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/10 20:39:48 UTC
[8/8] flink git commit: [hotfix] [tests] Share proper test mini
cluster for tests in DistributedCacheTest
[hotfix] [tests] Share proper test mini cluster for tests in DistributedCacheTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06216334
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06216334
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06216334
Branch: refs/heads/release-1.3
Commit: 062163344f8785733503837b83a4d063baa3c4cf
Parents: 1b21737
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 10 20:57:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 10 21:08:53 2017 +0200
----------------------------------------------------------------------
.../distributedCache/DistributedCacheTest.java | 56 ++++++++++++++++----
1 file changed, 46 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/06216334/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
index 19bcf76..21aa40a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
@@ -22,19 +22,30 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
-import java.io.*;
+import static org.junit.Assert.assertTrue;
-import java.util.*;
+public class DistributedCacheTest extends AbstractTestBase {
-public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
public static final String data
= "machen\n"
+ "zeit\n"
@@ -42,6 +53,31 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
+ "keiner\n"
+ "meine\n";
+ private static final int PARALLELISM = 4;
+
+ private static LocalFlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true);
+ TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
+ TestEnvironment.setAsContext(cluster, PARALLELISM);
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ TestStreamEnvironment.unsetAsContext();
+ TestEnvironment.unsetAsContext();
+ TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public DistributedCacheTest() {
+ super(new Configuration());
+ }
+
+ // ------------------------------------------------------------------------
@Test
public void testStreamingDistributedCache() throws Exception {
@@ -68,12 +104,12 @@ public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
@Override
public void open(Configuration conf) throws IOException {
File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
- BufferedReader reader = new BufferedReader(new FileReader(file));
- String tempString;
- while ((tempString = reader.readLine()) != null) {
- wordList.add(tempString);
+ try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
+ String tempString;
+ while ((tempString= reader.readLine()) != null) {
+ wordList.add(tempString);
+ }
}
- reader.close();
}
@Override