You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/09 08:14:47 UTC

[4/7] flink git commit: [FLINK-7909] Unify Flink test bases

[FLINK-7909] Unify Flink test bases

Introduce a MiniClusterResource which is used by the AbstractTestBase to start
and shut down a FlinkMiniCluster. Additionally, this resource registers the proper
Stream- and ExecutionEnvironment which is now the only way for tests to start
jobs. This change will thus allow to centrally control which FlinkCluster will
be started for all test bases.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c5c8325
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c5c8325
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c5c8325

Branch: refs/heads/master
Commit: 3c5c8325b6641854fd86596ffe1fc5641a6757c2
Parents: a03cdfa
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Oct 24 11:32:05 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 9 08:05:51 2018 +0100

----------------------------------------------------------------------
 .../examples/windowing/TopSpeedWindowing.java   |   1 +
 .../TopSpeedWindowingExampleITCase.java         |   1 -
 .../TopSpeedWindowingExampleITCase.java         |   1 -
 .../examples/StreamingExamplesITCase.scala      |  21 +-
 .../util/StreamingMultipleProgramsTestBase.java |  27 ---
 .../util/StreamingProgramTestBase.java          |  80 ++-----
 .../flink/test/util/AbstractTestBase.java       | 100 ++++-----
 .../flink/test/util/JavaProgramTestBase.java    | 161 ++++++--------
 .../flink/test/util/MiniClusterResource.java    | 144 ++++++++++++
 .../apache/flink/test/util/TestEnvironment.java |   3 +-
 .../distributedcache/DistributedCacheTest.java  |  30 ---
 .../test/example/scala/WordCountITCase.java     |   6 -
 ...ectedComponentsWithDeferredUpdateITCase.java |  20 +-
 .../DependencyConnectedComponentsITCase.java    |   4 -
 .../flink/test/operators/TypeHintITCase.java    |  21 +-
 .../runtime/NetworkStackThroughputITCase.java   | 219 +++++++++----------
 16 files changed, 406 insertions(+), 433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index ee06cd4..7543bab 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -55,6 +55,7 @@ public class TopSpeedWindowing {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.getConfig().setGlobalJobParameters(params);
+		env.setParallelism(1);
 
 		@SuppressWarnings({"rawtypes", "serial"})
 		DataStream<Tuple4<Integer, Integer, Double, Long>> carData;

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index 0d869b8..c2f3164 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -31,7 +31,6 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void preSubmit() throws Exception {
-		setParallelism(1); //needed to ensure total ordering for windows
 		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
 		resultPath = getTempDirPath("result");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
index 8dbfe88..db27c60 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -30,7 +30,6 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void preSubmit() throws Exception {
-		setParallelism(1); //needed to ensure total ordering for windows
 		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
 		resultPath = getTempDirPath("result");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index 0a26a35..24d1444 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -39,8 +39,7 @@ import org.apache.flink.streaming.scala.examples.wordcount.WordCount
 import org.apache.flink.streaming.test.examples.join.WindowJoinData
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.TestBaseUtils
-
+import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.junit.Test
 
 /**
@@ -50,8 +49,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testIterateExample(): Unit = {
-    val inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
-    val resultPath = getTempDirPath("result")
+    val inputPath = AbstractTestBase.createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
+    val resultPath = AbstractTestBase.getTempDirPath("result")
 
     // the example is inherently non-deterministic. The iteration timeout of 5000 ms
     // is frequently not enough to make the test run stable on CI infrastructure
@@ -100,14 +99,14 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testIncrementalLearningSkeleton(): Unit = {
-    val resultPath = getTempDirPath("result")
+    val resultPath = AbstractTestBase.getTempDirPath("result")
     IncrementalLearningSkeleton.main(Array("--output", resultPath))
     TestBaseUtils.compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath)
   }
 
   @Test
   def testTwitterExample(): Unit = {
-    val resultPath = getTempDirPath("result")
+    val resultPath = AbstractTestBase.getTempDirPath("result")
     TwitterExample.main(Array("--output", resultPath))
     TestBaseUtils.compareResultsByLinesInMemory(
       TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
@@ -116,7 +115,7 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testSessionWindowing(): Unit = {
-    val resultPath = getTempDirPath("result")
+    val resultPath = AbstractTestBase.getTempDirPath("result")
     SessionWindowing.main(Array("--output", resultPath))
     TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
   }
@@ -125,8 +124,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
   def testWindowWordCount(): Unit = {
     val windowSize = "250"
     val slideSize = "150"
-    val textPath = createTempFile("text.txt", WordCountData.TEXT)
-    val resultPath = getTempDirPath("result")
+    val textPath = AbstractTestBase.createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = AbstractTestBase.getTempDirPath("result")
 
     WindowWordCount.main(Array(
       "--input", textPath,
@@ -143,8 +142,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testWordCount(): Unit = {
-    val textPath = createTempFile("text.txt", WordCountData.TEXT)
-    val resultPath = getTempDirPath("result")
+    val textPath = AbstractTestBase.createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = AbstractTestBase.getTempDirPath("result")
 
     WordCount.main(Array(
       "--input", textPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 8324c3a..aa6e618 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,13 +18,8 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,29 +58,7 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
 
 	protected static final int DEFAULT_PARALLELISM = 4;
 
-	protected static LocalFlinkMiniCluster cluster;
-
-	public StreamingMultipleProgramsTestBase() {
-		super(new Configuration());
-	}
-
 	protected static final Logger LOG = LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class);
 
-	// ------------------------------------------------------------------------
-	//  Cluster setup & teardown
-	// ------------------------------------------------------------------------
-
-	@BeforeClass
-	public static void setup() throws Exception {
-		LOG.info("In StreamingMultipleProgramsTestBase: Starting FlinkMiniCluster ");
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true);
-		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
-	}
 
-	@AfterClass
-	public static void teardown() throws Exception {
-		LOG.info("In StreamingMultipleProgramsTestBase: Closing FlinkMiniCluster ");
-		TestStreamEnvironment.unsetAsContext();
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index e48ae6e..7093f5d 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Test;
@@ -32,24 +31,6 @@ import static org.junit.Assert.fail;
  */
 public abstract class StreamingProgramTestBase extends AbstractTestBase {
 
-	protected static final int DEFAULT_PARALLELISM = 4;
-
-	private int parallelism;
-
-	public StreamingProgramTestBase() {
-		super(new Configuration());
-		setParallelism(DEFAULT_PARALLELISM);
-	}
-
-	public void setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-		setTaskManagerNumSlots(parallelism);
-	}
-
-	public int getParallelism() {
-		return parallelism;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Methods to create the test program and for pre- and post- test work
 	// --------------------------------------------------------------------------------------------
@@ -66,47 +47,34 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
 
 	@Test
 	public void testJob() throws Exception {
+		// pre-submit
 		try {
-			// pre-submit
-			try {
-				preSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Pre-submit work caused an error: " + e.getMessage());
-			}
-
-			// prepare the test environment
-			startCluster();
-
-			TestStreamEnvironment.setAsContext(this.executor, getParallelism());
+			preSubmit();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Pre-submit work caused an error: " + e.getMessage());
+		}
 
-			// call the test program
-			try {
-				testProgram();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Error while calling the test program: " + e.getMessage());
-			}
-			finally {
-				TestStreamEnvironment.unsetAsContext();
-			}
+		// call the test program
+		try {
+			testProgram();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Error while calling the test program: " + e.getMessage());
+		}
 
-			// post-submit
-			try {
-				postSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Post-submit work caused an error: " + e.getMessage());
-			}
+		// post-submit
+		try {
+			postSubmit();
 		}
-		finally {
-			stopCluster();
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Post-submit work caused an error: " + e.getMessage());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index dd0005b..bbd250d 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -19,97 +19,75 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Objects;
-
-import scala.concurrent.duration.FiniteDuration;
 
 /**
- * A base class for tests that run test programs in a Flink mini cluster.
+ * Base class for unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * <p>To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the StreamExecutionEnvironment from
+ * the context:
+ *
+ * <pre>
+ *   {@literal @}Test
+ *   public void someTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * </pre>
  */
 public abstract class AbstractTestBase extends TestBaseUtils {
 
-	/** Configuration to start the testing cluster with. */
-	protected final Configuration config;
-
-	private final FiniteDuration timeout;
+	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
 
-	protected int taskManagerNumSlots = 1;
+	private static final int DEFAULT_PARALLELISM = 4;
 
-	protected int numTaskManagers = 1;
+	@ClassRule
+	public static MiniClusterResource miniClusterResource = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			new Configuration(),
+			1,
+			DEFAULT_PARALLELISM));
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
-	/** The mini cluster that runs the test programs. */
-	protected LocalFlinkMiniCluster executor;
-
-	public AbstractTestBase(Configuration config) {
-		this.config = Objects.requireNonNull(config);
-
-		timeout = AkkaUtils.getTimeout(config);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Local Test Cluster Life Cycle
-	// --------------------------------------------------------------------------------------------
-
-	public void startCluster() throws Exception {
-		this.executor = startCluster(
-			numTaskManagers,
-			taskManagerNumSlots,
-			false,
-			false,
-			true);
-	}
-
-	public void stopCluster() throws Exception {
-		stopCluster(executor, timeout);
-	}
-
-	//------------------
-	// Accessors
-	//------------------
-
-	public int getTaskManagerNumSlots() {
-		return taskManagerNumSlots;
-	}
-
-	public void setTaskManagerNumSlots(int taskManagerNumSlots) {
-		this.taskManagerNumSlots = taskManagerNumSlots;
-	}
-
-	public int getNumTaskManagers() {
-		return numTaskManagers;
-	}
-
-	public void setNumTaskManagers(int numTaskManagers) {
-		this.numTaskManagers = numTaskManagers;
-	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Temporary File Utilities
 	// --------------------------------------------------------------------------------------------
 
-	public String getTempDirPath(String dirName) throws IOException {
+	public static String getTempDirPath(String dirName) throws IOException {
 		File f = createAndRegisterTempFile(dirName);
 		return f.toURI().toString();
 	}
 
-	public String getTempFilePath(String fileName) throws IOException {
+	public static String getTempFilePath(String fileName) throws IOException {
 		File f = createAndRegisterTempFile(fileName);
 		return f.toURI().toString();
 	}
 
-	public String createTempFile(String fileName, String contents) throws IOException {
+	public static String createTempFile(String fileName, String contents) throws IOException {
 		File f = createAndRegisterTempFile(fileName);
 		if (!f.getParentFile().exists()) {
 			f.getParentFile().mkdirs();
@@ -119,7 +97,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 		return f.toURI().toString();
 	}
 
-	public File createAndRegisterTempFile(String fileName) throws IOException {
+	public static File createAndRegisterTempFile(String fileName) throws IOException {
 		return new File(TEMPORARY_FOLDER.newFolder(), fileName);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 3a01af8..de536d8 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.ExecutionEnvironment;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -34,12 +34,8 @@ import org.junit.Test;
  */
 public abstract class JavaProgramTestBase extends AbstractTestBase {
 
-	private static final int DEFAULT_PARALLELISM = 4;
-
 	private JobExecutionResult latestExecutionResult;
 
-	private int parallelism = DEFAULT_PARALLELISM;
-
 	/**
 	 * The number of times a test should be repeated.
 	 *
@@ -50,26 +46,12 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 
 	private boolean isCollectionExecution;
 
-	public JavaProgramTestBase() {
-		this(new Configuration());
-	}
-
-	public JavaProgramTestBase(Configuration config) {
-		super(config);
-		setTaskManagerNumSlots(parallelism);
-	}
-
-	public void setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-		setTaskManagerNumSlots(parallelism);
-	}
-
 	public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
 		this.numberOfTestRepetitions = numberOfTestRepetitions;
 	}
 
 	public int getParallelism() {
-		return isCollectionExecution ? 1 : parallelism;
+		return isCollectionExecution ? 1 : miniClusterResource.getNumberSlots();
 	}
 
 	public JobExecutionResult getLatestExecutionResult() {
@@ -102,53 +84,47 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	public void testJobWithObjectReuse() throws Exception {
 		isCollectionExecution = false;
 
-		startCluster();
-
+		// pre-submit
 		try {
-			// pre-submit
-			try {
-				preSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				Assert.fail("Pre-submit work caused an error: " + e.getMessage());
-			}
+			preSubmit();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Pre-submit work caused an error: " + e.getMessage());
+		}
 
-			// prepare the test environment
-			TestEnvironment env = new TestEnvironment(this.executor, this.parallelism, false);
-			env.getConfig().enableObjectReuse();
-			env.setAsContext();
-
-			// Possibly run the test multiple times
-			for (int i = 0; i < numberOfTestRepetitions; i++) {
-				// call the test program
-				try {
-					testProgram();
-					this.latestExecutionResult = env.getLastJobExecutionResult();
-				}
-				catch (Exception e) {
-					System.err.println(e.getMessage());
-					e.printStackTrace();
-					Assert.fail("Error while calling the test program: " + e.getMessage());
-				}
-
-				Assert.assertNotNull("The test program never triggered an execution.",
-						this.latestExecutionResult);
-			}
+		// This only works because the underlying ExecutionEnvironment is a TestEnvironment
+		// We should fix that we are able to get access to the latest execution result from a different
+		// execution environment and how the object reuse mode is enabled
+		TestEnvironment env = miniClusterResource.getTestEnvironment();
+		env.getConfig().enableObjectReuse();
 
-			// post-submit
+		// Possibly run the test multiple times
+		for (int i = 0; i < numberOfTestRepetitions; i++) {
+			// call the test program
 			try {
-				postSubmit();
+				testProgram();
+				this.latestExecutionResult = env.getLastJobExecutionResult();
 			}
 			catch (Exception e) {
 				System.err.println(e.getMessage());
 				e.printStackTrace();
-				Assert.fail("Post-submit work caused an error: " + e.getMessage());
+				Assert.fail("Error while calling the test program: " + e.getMessage());
 			}
-		} finally {
-			stopCluster();
-			TestEnvironment.unsetAsContext();
+
+			Assert.assertNotNull("The test program never triggered an execution.",
+					this.latestExecutionResult);
+		}
+
+		// post-submit
+		try {
+			postSubmit();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Post-submit work caused an error: " + e.getMessage());
 		}
 	}
 
@@ -156,52 +132,47 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	public void testJobWithoutObjectReuse() throws Exception {
 		isCollectionExecution = false;
 
-		startCluster();
+		// pre-submit
 		try {
-			// pre-submit
-			try {
-				preSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				Assert.fail("Pre-submit work caused an error: " + e.getMessage());
-			}
+			preSubmit();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Pre-submit work caused an error: " + e.getMessage());
+		}
 
-			// prepare the test environment
-			TestEnvironment env = new TestEnvironment(this.executor, this.parallelism, false);
-			env.getConfig().disableObjectReuse();
-			env.setAsContext();
-
-			// Possibly run the test multiple times
-			for (int i = 0; i < numberOfTestRepetitions; i++) {
-				// call the test program
-				try {
-					testProgram();
-					this.latestExecutionResult = env.getLastJobExecutionResult();
-				}
-				catch (Exception e) {
-					System.err.println(e.getMessage());
-					e.printStackTrace();
-					Assert.fail("Error while calling the test program: " + e.getMessage());
-				}
-
-				Assert.assertNotNull("The test program never triggered an execution.",
-						this.latestExecutionResult);
-			}
+		// This only works because the underlying ExecutionEnvironment is a TestEnvironment
+		// We should fix that we are able to get access to the latest execution result from a different
+		// execution environment and how the object reuse mode is enabled
+		ExecutionEnvironment env = miniClusterResource.getTestEnvironment();
+		env.getConfig().disableObjectReuse();
 
-			// post-submit
+		// Possibly run the test multiple times
+		for (int i = 0; i < numberOfTestRepetitions; i++) {
+			// call the test program
 			try {
-				postSubmit();
+				testProgram();
+				this.latestExecutionResult = env.getLastJobExecutionResult();
 			}
 			catch (Exception e) {
 				System.err.println(e.getMessage());
 				e.printStackTrace();
-				Assert.fail("Post-submit work caused an error: " + e.getMessage());
+				Assert.fail("Error while calling the test program: " + e.getMessage());
 			}
-		} finally {
-			stopCluster();
-			TestEnvironment.unsetAsContext();
+
+			Assert.assertNotNull("The test program never triggered an execution.",
+					this.latestExecutionResult);
+		}
+
+		// post-submit
+		try {
+			postSubmit();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Post-submit work caused an error: " + e.getMessage());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
new file mode 100644
index 0000000..c8872ac
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Starts a Flink mini cluster as a resource and registers the respective
+ * ExecutionEnvironment and StreamExecutionEnvironment.
+ */
+public class MiniClusterResource extends ExternalResource {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
+
+	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
+
+	private LocalFlinkMiniCluster localFlinkMiniCluster;
+
+	private int numberSlots = -1;
+
+	private TestEnvironment executionEnvironment;
+
+	public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
+	}
+
+	public int getNumberSlots() {
+		return numberSlots;
+	}
+
+	public TestEnvironment getTestEnvironment() {
+		return executionEnvironment;
+	}
+
+	@Override
+	public void before() throws Exception {
+		localFlinkMiniCluster = TestBaseUtils.startCluster(
+			miniClusterResourceConfiguration.getNumberTaskManagers(),
+			miniClusterResourceConfiguration.getNumberSlotsPerTaskManager(),
+			false,
+			false,
+			true);
+
+		numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
+
+		executionEnvironment = new TestEnvironment(localFlinkMiniCluster, numberSlots, false);
+		executionEnvironment.setAsContext();
+		TestStreamEnvironment.setAsContext(localFlinkMiniCluster, numberSlots);
+	}
+
+	@Override
+	public void after() {
+		if (localFlinkMiniCluster != null) {
+			try {
+				TestBaseUtils.stopCluster(
+					localFlinkMiniCluster,
+					FutureUtils.toFiniteDuration(miniClusterResourceConfiguration.getShutdownTimeout()));
+			} catch (Exception e) {
+				LOG.warn("Could not properly shut down the Flink mini cluster.", e);
+			}
+
+			TestStreamEnvironment.unsetAsContext();
+			TestEnvironment.unsetAsContext();
+			localFlinkMiniCluster = null;
+		}
+	}
+
+	/**
+	 * Mini cluster resource configuration object.
+	 */
+	public static class MiniClusterResourceConfiguration {
+		private final Configuration configuration;
+
+		private final int numberTaskManagers;
+
+		private final int numberSlotsPerTaskManager;
+
+		private final Time shutdownTimeout;
+
+		public MiniClusterResourceConfiguration(
+				Configuration configuration,
+				int numberTaskManagers,
+				int numberSlotsPerTaskManager) {
+			this(
+				configuration,
+				numberTaskManagers,
+				numberSlotsPerTaskManager,
+				AkkaUtils.getTimeoutAsTime(configuration));
+		}
+
+		public MiniClusterResourceConfiguration(
+				Configuration configuration,
+				int numberTaskManagers,
+				int numberSlotsPerTaskManager,
+				Time shutdownTimeout) {
+			this.configuration = Preconditions.checkNotNull(configuration);
+			this.numberTaskManagers = numberTaskManagers;
+			this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+			this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
+		}
+
+		public Configuration getConfiguration() {
+			return configuration;
+		}
+
+		public int getNumberTaskManagers() {
+			return numberTaskManagers;
+		}
+
+		public int getNumberSlotsPerTaskManager() {
+			return numberSlotsPerTaskManager;
+		}
+
+		public Time getShutdownTimeout() {
+			return shutdownTimeout;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 6c1ce74..ac5b17d 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -170,7 +170,8 @@ public class TestEnvironment extends ExecutionEnvironment {
 				return new TestEnvironment(
 					miniCluster,
 					parallelism,
-					false, jarFiles,
+					false,
+					jarFiles,
 					classPaths
 				);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
index 63ce3ab..29d442b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
@@ -22,16 +22,10 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.BufferedReader;
@@ -55,30 +49,6 @@ public class DistributedCacheTest extends AbstractTestBase {
 			"keiner\n" +
 			"meine\n";
 
-	private static final int PARALLELISM = 4;
-
-	private static LocalFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void setup() throws Exception {
-		cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true);
-		TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
-		TestEnvironment.setAsContext(cluster, PARALLELISM);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		TestStreamEnvironment.unsetAsContext();
-		TestEnvironment.unsetAsContext();
-		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-
-	// ------------------------------------------------------------------------
-
-	public DistributedCacheTest() {
-		super(new Configuration());
-	}
-
 	// ------------------------------------------------------------------------
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-tests/src/test/java/org/apache/flink/test/example/scala/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/scala/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/scala/WordCountITCase.java
index d7a00b2..6c5bda3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/scala/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/scala/WordCountITCase.java
@@ -31,12 +31,6 @@ public class WordCountITCase extends JavaProgramTestBase {
 	protected String textPath;
 	protected String resultPath;
 
-	public WordCountITCase(){
-		setParallelism(4);
-		setNumTaskManagers(2);
-		setTaskManagerNumSlots(2);
-	}
-
 	@Override
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
index e425f29..de40bbd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -37,6 +36,7 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.io.BufferedReader;
+import java.util.Arrays;
 import java.util.Collection;
 
 /**
@@ -52,12 +52,14 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
 
 	private static final int NUM_EDGES = 10000;
 
+	private final boolean extraMapper;
+
 	protected String verticesPath;
 	protected String edgesPath;
 	protected String resultPath;
 
-	public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
-		super(config);
+	public ConnectedComponentsWithDeferredUpdateITCase(boolean extraMapper) {
+		this.extraMapper = extraMapper;
 	}
 
 	@Override
@@ -69,8 +71,6 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
 
 	@Override
 	protected void testProgram() throws Exception {
-		boolean extraMapper = config.getBoolean("ExtraMapper", false);
-
 		// set up execution environment
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -99,6 +99,8 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
 			delta = changes.map(
 					// ID Mapper
 					new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+						private static final long serialVersionUID = -3929364091829757322L;
+
 						@Override
 						public Tuple2<Long, Long> map(Tuple2<Long, Long> v) throws Exception {
 							return v;
@@ -127,13 +129,7 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTest
 
 	@Parameters
 	public static Collection<Object[]> getConfigurations() {
-		Configuration config1 = new Configuration();
-		config1.setBoolean("ExtraMapper", false);
-
-		Configuration config2 = new Configuration();
-		config2.setBoolean("ExtraMapper", true);
-
-		return toParameterList(config1, config2);
+		return Arrays.asList(new Object[]{false}, new Object[]{true});
 	}
 
 	private static final class UpdateComponentIdMatchNonPreserving

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index 74e3da2..c9779f4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -48,10 +48,6 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 	private String resultPath;
 	private String expectedResult;
 
-	public DependencyConnectedComponentsITCase(){
-		setTaskManagerNumSlots(parallelism);
-	}
-
 	@Override
 	protected void preSubmit() throws Exception {
 		verticesInput.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
index 75bf8f0..b634005 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.operators.util.CollectionDataSets;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
@@ -40,10 +39,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -54,10 +51,10 @@ public class TypeHintITCase extends JavaProgramTestBase {
 
 	private static final int NUM_PROGRAMS = 9;
 
-	private int curProgId = config.getInteger("ProgramId", -1);
+	private final int curProgId;
 
-	public TypeHintITCase(Configuration config) {
-		super(config);
+	public TypeHintITCase(int curProgId) {
+		this.curProgId = curProgId;
 	}
 
 	@Override
@@ -66,17 +63,15 @@ public class TypeHintITCase extends JavaProgramTestBase {
 	}
 
 	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+	public static Collection<Object[]> getConfigurations() {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+		Collection<Object[]> parameters = new ArrayList<>(NUM_PROGRAMS);
 
 		for (int i = 1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+			parameters.add(new Object[]{i});
 		}
 
-		return toParameterList(tConfigs);
+		return parameters;
 	}
 
 	private static class TypeHintProgs {

http://git-wip-us.apache.org/repos/asf/flink/blob/3c5c8325/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 92bf6d6..e0bd451 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -19,10 +19,10 @@
 package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -31,7 +31,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
@@ -52,12 +54,6 @@ public class NetworkStackThroughputITCase extends TestLogger {
 
 	private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
 
-	private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
-
-	private static final String PARALLELISM_CONFIG_KEY = "num.subtasks";
-
-	private static final String NUM_SLOTS_PER_TM_CONFIG_KEY = "num.slots.per.tm";
-
 	private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
 
 	private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
@@ -68,101 +64,6 @@ public class NetworkStackThroughputITCase extends TestLogger {
 
 	// ------------------------------------------------------------------------
 
-	// wrapper to reuse JavaProgramTestBase code in runs via main()
-	private static class TestBaseWrapper extends JavaProgramTestBase {
-
-		private int dataVolumeGb;
-		private boolean useForwarder;
-		private boolean isSlowSender;
-		private boolean isSlowReceiver;
-		private int parallelism;
-
-		public TestBaseWrapper(Configuration config) {
-			super(config);
-
-			dataVolumeGb = config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-			useForwarder = config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
-			isSlowSender = config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
-			isSlowReceiver = config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
-			parallelism = config.getInteger(PARALLELISM_CONFIG_KEY, 1);
-
-			int numSlots = config.getInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, 1);
-
-			if (parallelism % numSlots != 0) {
-				throw new RuntimeException("The test case defines a parallelism that is not a multiple of the slots per task manager.");
-			}
-
-			setNumTaskManagers(parallelism / numSlots);
-			setTaskManagerNumSlots(numSlots);
-		}
-
-		protected JobGraph getJobGraph() throws Exception {
-			return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
-		}
-
-		private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
-										boolean isSlowReceiver, int numSubtasks) {
-			JobGraph jobGraph = new JobGraph("Speed Test");
-			SlotSharingGroup sharingGroup = new SlotSharingGroup();
-
-			JobVertex producer = new JobVertex("Speed Test Producer");
-			jobGraph.addVertex(producer);
-			producer.setSlotSharingGroup(sharingGroup);
-
-			producer.setInvokableClass(SpeedTestProducer.class);
-			producer.setParallelism(numSubtasks);
-			producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
-			producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
-
-			JobVertex forwarder = null;
-			if (useForwarder) {
-				forwarder = new JobVertex("Speed Test Forwarder");
-				jobGraph.addVertex(forwarder);
-				forwarder.setSlotSharingGroup(sharingGroup);
-
-				forwarder.setInvokableClass(SpeedTestForwarder.class);
-				forwarder.setParallelism(numSubtasks);
-			}
-
-			JobVertex consumer = new JobVertex("Speed Test Consumer");
-			jobGraph.addVertex(consumer);
-			consumer.setSlotSharingGroup(sharingGroup);
-
-			consumer.setInvokableClass(SpeedTestConsumer.class);
-			consumer.setParallelism(numSubtasks);
-			consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
-
-			if (useForwarder) {
-				forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL,
-					ResultPartitionType.PIPELINED);
-				consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL,
-					ResultPartitionType.PIPELINED);
-			}
-			else {
-				consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL,
-					ResultPartitionType.PIPELINED);
-			}
-
-			return jobGraph;
-		}
-
-		@Override
-		protected void testProgram() throws Exception {
-			JobExecutionResult jer = executor.submitJobAndWait(getJobGraph(), false);
-			int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
-
-			long dataVolumeMbit = dataVolumeGb * 8192;
-			long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
-
-			int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
-
-			LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
-					"data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
 	/**
 	 * Invokable that produces records and allows slowdown via {@link #IS_SLOW_EVERY_NUM_RECORDS}
 	 * and {@link #IS_SLOW_SENDER_CONFIG_KEY} and creates records of different data sizes via {@link
@@ -307,22 +208,110 @@ public class NetworkStackThroughputITCase extends TestLogger {
 		};
 
 		for (Object[] p : configParams) {
-			Configuration config = new Configuration();
-			config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, (Integer) p[0]);
-			config.setBoolean(USE_FORWARDER_CONFIG_KEY, (Boolean) p[1]);
-			config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, (Boolean) p[2]);
-			config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, (Boolean) p[3]);
-			config.setInteger(PARALLELISM_CONFIG_KEY, (Integer) p[4]);
-			config.setInteger(NUM_SLOTS_PER_TM_CONFIG_KEY, (Integer) p[5]);
+			final int dataVolumeGb = (Integer) p[0];
+			final boolean useForwarder = (Boolean) p[1];
+			final boolean isSlowSender = (Boolean) p[2];
+			final boolean isSlowReceiver = (Boolean) p[3];
+			final int parallelism = (Integer) p[4];
+			final int numSlotsPerTaskManager = (Integer) p[5];
+
+			if (parallelism % numSlotsPerTaskManager != 0) {
+				throw new RuntimeException("The test case defines a parallelism that is not a multiple of the slots per task manager.");
+			}
+
+			final int numTaskManagers = parallelism / numSlotsPerTaskManager;
+
+			final LocalFlinkMiniCluster localFlinkMiniCluster = TestBaseUtils.startCluster(
+				numTaskManagers,
+				numSlotsPerTaskManager,
+				false,
+				false,
+				true);
+
+			try {
+				System.out.println(Arrays.toString(p));
+				testProgram(
+					localFlinkMiniCluster,
+					dataVolumeGb,
+					useForwarder,
+					isSlowSender,
+					isSlowReceiver,
+					parallelism);
+			} finally {
+				TestBaseUtils.stopCluster(localFlinkMiniCluster, FutureUtils.toFiniteDuration(TestingUtils.TIMEOUT()));
+			}
+		}
+	}
+
+	private void testProgram(
+			LocalFlinkMiniCluster localFlinkMiniCluster,
+			final int dataVolumeGb,
+			final boolean useForwarder,
+			final boolean isSlowSender,
+			final boolean isSlowReceiver,
+			final int parallelism) throws Exception {
+		JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait(
+			createJobGraph(
+				dataVolumeGb,
+				useForwarder,
+				isSlowSender,
+				isSlowReceiver,
+				parallelism),
+			false);
+
+		long dataVolumeMbit = dataVolumeGb * 8192;
+		long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
+
+		int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
+
+		LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
+			"data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
+	}
+
+	private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender,
+									boolean isSlowReceiver, int numSubtasks) {
+		JobGraph jobGraph = new JobGraph("Speed Test");
+		SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
-			TestBaseWrapper test = new TestBaseWrapper(config);
-			test.startCluster();
+		JobVertex producer = new JobVertex("Speed Test Producer");
+		jobGraph.addVertex(producer);
+		producer.setSlotSharingGroup(sharingGroup);
 
-			System.out.println(Arrays.toString(p));
-			test.testProgram();
+		producer.setInvokableClass(SpeedTestProducer.class);
+		producer.setParallelism(numSubtasks);
+		producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
+		producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
 
-			test.stopCluster();
+		JobVertex forwarder = null;
+		if (useForwarder) {
+			forwarder = new JobVertex("Speed Test Forwarder");
+			jobGraph.addVertex(forwarder);
+			forwarder.setSlotSharingGroup(sharingGroup);
+
+			forwarder.setInvokableClass(SpeedTestForwarder.class);
+			forwarder.setParallelism(numSubtasks);
 		}
+
+		JobVertex consumer = new JobVertex("Speed Test Consumer");
+		jobGraph.addVertex(consumer);
+		consumer.setSlotSharingGroup(sharingGroup);
+
+		consumer.setInvokableClass(SpeedTestConsumer.class);
+		consumer.setParallelism(numSubtasks);
+		consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
+
+		if (useForwarder) {
+			forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL,
+				ResultPartitionType.PIPELINED);
+			consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL,
+				ResultPartitionType.PIPELINED);
+		}
+		else {
+			consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL,
+				ResultPartitionType.PIPELINED);
+		}
+
+		return jobGraph;
 	}
 
 	private void runAllTests() throws Exception {