You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/09 08:14:49 UTC

[6/7] flink git commit: [FLINK-7909] Replace StreamingMultipleProgramsTestBase by AbstractTestBase

[FLINK-7909] Replace StreamingMultipleProgramsTestBase by AbstractTestBase

The AbstractTestBase fully subsumes the functionality of the
StreamingMultipleProgramsTestBase since it now is the most general test base
for streaming and batch jobs. As a consequence, we can safely remove the
StreamingMultipleProgramsTestBase and let all corresponding tests extend from
AbstractTestBase.

This closes #4896.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b90210e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b90210e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b90210e3

Branch: refs/heads/master
Commit: b90210e3712a54ad85a33dfc308a03e0c4a2a250
Parents: 3c5c832
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Oct 24 16:20:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 9 08:05:51 2018 +0100

----------------------------------------------------------------------
 .../ElasticsearchSinkTestBase.java              |  8 ++-
 .../connectors/fs/RollingSinkITCase.java        | 22 +++++--
 .../connectors/fs/RollingSinkSecuredITCase.java | 62 +++++++++----------
 .../mapred/HadoopIOFormatsITCase.java           | 22 +++----
 .../mapred/HadoopMapredITCase.java              |  1 -
 .../mapreduce/HadoopInputOutputITCase.java      |  1 -
 .../apache/flink/storm/split/SplitITCase.java   |  4 +-
 .../examples/windowing/TopSpeedWindowing.java   |  1 -
 .../streaming/test/StreamingExamplesITCase.java |  4 +-
 .../TopSpeedWindowingExampleITCase.java         | 50 +++++++++------
 .../socket/SocketWindowWordCountITCase.java     |  4 +-
 .../examples/StreamingExamplesITCase.scala      | 22 +++----
 .../java/org/apache/flink/cep/CEPITCase.java    |  4 +-
 .../table/runtime/stream/sql/JavaSqlITCase.java |  4 +-
 .../flink/table/api/stream/ExplainTest.scala    |  4 +-
 .../UnsupportedOpsValidationTest.scala          |  4 +-
 .../runtime/stream/TimeAttributesITCase.scala   | 10 +--
 .../runtime/stream/sql/TableSourceITCase.scala  |  4 +-
 .../table/runtime/stream/table/CalcITCase.scala |  9 ++-
 .../runtime/stream/table/CorrelateITCase.scala  |  4 +-
 .../stream/table/GroupWindowITCase.scala        | 10 +--
 .../stream/table/SetOperatorsITCase.scala       |  4 +-
 .../runtime/stream/table/TableSinkITCase.scala  |  7 +--
 .../stream/table/TableSourceITCase.scala        |  6 +-
 .../utils/StreamingWithStateTestBase.scala      |  4 +-
 .../flink/streaming/api/DataStreamTest.java     |  3 +-
 .../scala/api/CsvOutputFormatITCase.java        |  4 +-
 .../scala/api/TextOutputFormatITCase.java       |  4 +-
 .../streaming/api/scala/CoGroupJoinITCase.scala |  7 +--
 .../streaming/api/scala/DataStreamTest.scala    | 28 ++++++---
 .../streaming/api/scala/SideOutputITCase.scala  |  4 +-
 .../api/scala/TimeWindowTranslationTest.scala   |  4 +-
 .../streaming/api/scala/WindowFoldITCase.scala  |  6 +-
 .../api/scala/WindowReduceITCase.scala          |  6 +-
 .../util/StreamingMultipleProgramsTestBase.java | 64 --------------------
 .../flink/test/util/AbstractTestBase.java       | 14 ++---
 .../flink/test/util/MiniClusterResource.java    | 11 ++--
 .../test/util/MultipleProgramsTestBase.java     | 48 ++++-----------
 .../apache/flink/test/util/TestBaseUtils.java   |  2 +-
 .../CoStreamCheckpointingITCase.java            |  4 +-
 .../StreamCheckpointNotifierITCase.java         |  4 +-
 .../test/state/ManualWindowSpeedITCase.java     |  4 +-
 .../streaming/api/StreamingOperatorsITCase.java |  4 +-
 .../runtime/ChainedRuntimeContextITCase.java    |  4 +-
 .../streaming/runtime/CoGroupJoinITCase.java    |  4 +-
 .../test/streaming/runtime/CoStreamITCase.java  |  4 +-
 .../streaming/runtime/DataStreamPojoITCase.java |  4 +-
 .../streaming/runtime/DirectedOutputITCase.java |  4 +-
 .../test/streaming/runtime/IterateITCase.java   | 26 ++++----
 .../streaming/runtime/OutputSplitterITCase.java |  4 +-
 .../streaming/runtime/PartitionerITCase.java    |  4 +-
 .../streaming/runtime/SelfConnectionITCase.java |  4 +-
 .../streaming/runtime/SideOutputITCase.java     |  4 +-
 .../streaming/runtime/StateBackendITCase.java   |  4 +-
 .../runtime/StreamTaskTimerITCase.java          |  4 +-
 .../streaming/runtime/WindowFoldITCase.java     |  4 +-
 .../sessionwindows/SessionWindowITCase.java     |  4 +-
 57 files changed, 253 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index 297bc5d..b90e8ed 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.elasticsearch.client.Client;
@@ -31,6 +31,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -44,7 +46,9 @@ import static org.junit.Assert.fail;
 /**
  * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations.
  */
-public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgramsTestBase {
+public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
 
 	protected static final String CLUSTER_NAME = "test-cluster";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 10d1846..78f643f 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -32,9 +32,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
@@ -83,13 +84,14 @@ import java.util.Map;
  * @deprecated should be removed with the {@link RollingSink}.
  */
 @Deprecated
-public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
+public class RollingSinkITCase extends TestLogger {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class);
 
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
+	protected static MiniClusterResource miniClusterResource;
 	protected static MiniDFSCluster hdfsCluster;
 	protected static org.apache.hadoop.fs.FileSystem dfs;
 	protected static String hdfsURI;
@@ -98,7 +100,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 	protected static File dataDir;
 
 	@BeforeClass
-	public static void createHDFS() throws IOException {
+	public static void setup() throws Exception {
 
 		LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
 
@@ -113,12 +115,22 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		hdfsURI = "hdfs://"
 				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
 				+ "/";
+
+		miniClusterResource = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				new org.apache.flink.configuration.Configuration(),
+				1,
+				4));
 	}
 
 	@AfterClass
-	public static void destroyHDFS() {
+	public static void teardown() throws Exception {
 		LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
 		hdfsCluster.shutdown();
+
+		if (miniClusterResource != null) {
+			miniClusterResource.after();
+		}
 	}
 
 	/**
@@ -926,6 +938,8 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
+		private static final long serialVersionUID = 761584896826819477L;
+
 		private String key;
 		private String expect;
 		public StreamWriterWithConfigCheck(String key, String expect) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 7595ac0..b76d087 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.modules.HadoopModule;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.test.util.TestingSecurityContext;
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -94,19 +93,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 	 * and out-of-order sequence for secure cluster
 	 */
 	@BeforeClass
-	public static void setup() throws Exception {}
-
-	@AfterClass
-	public static void teardown() throws Exception {}
-
-	@BeforeClass
-	public static void createHDFS() throws IOException {}
-
-	@AfterClass
-	public static void destroyHDFS() {}
-
-	@BeforeClass
-	public static void startSecureCluster() throws Exception {
+	public static void setup() throws Exception {
 
 		skipIfHadoopVersionIsNotAppropriate();
 
@@ -158,20 +145,29 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
 				+ "/";
 
-		startSecureFlinkClusterWithRecoveryModeEnabled();
+		Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled();
+
+		miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(
+			configuration,
+			1,
+			4));
+
+		miniClusterResource.before();
 	}
 
 	@AfterClass
-	public static void teardownSecureCluster() throws Exception {
+	public static void teardown() throws Exception {
 		LOG.info("tearing down secure cluster environment");
 
-		TestStreamEnvironment.unsetAsContext();
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-
 		if (hdfsCluster != null) {
 			hdfsCluster.shutdown();
 		}
 
+		if (miniClusterResource != null) {
+			miniClusterResource.after();
+			miniClusterResource = null;
+		}
+
 		SecureTestEnvironment.cleanup();
 	}
 
@@ -208,30 +204,26 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 		conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
 	}
 
-	private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
+	private static Configuration startSecureFlinkClusterWithRecoveryModeEnabled() {
 		try {
 			LOG.info("Starting Flink and ZK in secure mode");
 
 			dfs.mkdirs(new Path("/flink/checkpoints"));
 			dfs.mkdirs(new Path("/flink/recovery"));
 
-			org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
-
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
-			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
-			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
-			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
+			final Configuration result = new Configuration();
 
-			SecureTestEnvironment.populateFlinkSecureConfigurations(config);
+			result.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
+			result.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+			result.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+			result.setString(CoreOptions.STATE_BACKEND, "filesystem");
+			result.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
+			result.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
+			result.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
 
-			cluster = TestBaseUtils.startCluster(config, false);
-			TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
+			SecureTestEnvironment.populateFlinkSecureConfigurations(result);
 
+			return result;
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
index 46102a2..753d813 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -23,9 +23,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.OperatingSystem;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -44,11 +42,9 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 
 /**
  * Integration tests for Hadoop IO formats.
@@ -58,14 +54,14 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 
 	private static final int NUM_PROGRAMS = 2;
 
-	private int curProgId = config.getInteger("ProgramId", -1);
+	private final int curProgId;
 	private String[] resultPath;
 	private String[] expectedResult;
 	private String sequenceFileInPath;
 	private String sequenceFileInPathNull;
 
-	public HadoopIOFormatsITCase(Configuration config) {
-		super(config);
+	public HadoopIOFormatsITCase(int curProgId) {
+		this.curProgId = curProgId;
 	}
 
 	@Before
@@ -143,17 +139,15 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 	}
 
 	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+	public static Collection<Object[]> getConfigurations() {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+		Collection<Object[]> programIds = new ArrayList<>(NUM_PROGRAMS);
 
 		for (int i = 1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
+			programIds.add(new Object[]{i});
 		}
 
-		return TestBaseUtils.toParameterList(tConfigs);
+		return programIds;
 	}
 
 	private static class HadoopIOFormatPrograms {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index 145eaaa..db2ad8e 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -44,7 +44,6 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
-		this.setParallelism(4);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index a23a50d..783a5a6 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -46,7 +46,6 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase {
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
-		this.setParallelism(4);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
index d53493c..7152cf2 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.storm.split;
 
 import org.apache.flink.storm.split.SpoutSplitExample.Enrich;
 import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -32,7 +32,7 @@ import java.io.IOException;
 /**
  * Tests for split examples.
  */
-public class SplitITCase extends StreamingMultipleProgramsTestBase {
+public class SplitITCase extends AbstractTestBase {
 
 	private String output;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 7543bab..ee06cd4 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -55,7 +55,6 @@ public class TopSpeedWindowing {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.getConfig().setGlobalJobParameters(params);
-		env.setParallelism(1);
 
 		@SuppressWarnings({"rawtypes", "serial"})
 		DataStream<Tuple4<Integer, Integer, Double, Long>> carData;

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 4c47d59..cfe899e 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonDa
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
 import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
 import org.apache.flink.streaming.test.examples.join.WindowJoinData;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Test;
@@ -40,7 +40,7 @@ import java.io.File;
 /**
  * Integration test for streaming programs in Java examples.
  */
-public class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+public class StreamingExamplesITCase extends AbstractTestBase {
 
 	@Test
 	public void testIterateExample() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index c2f3164..320dd5f 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,33 +17,47 @@
 
 package org.apache.flink.streaming.test.examples.windowing;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
 import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
 
 /**
  * Tests for {@link TopSpeedWindowing}.
  */
-public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+public class TopSpeedWindowingExampleITCase extends TestLogger {
 
-	protected String textPath;
-	protected String resultPath;
+	@ClassRule
+	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
-		resultPath = getTempDirPath("result");
-	}
+	@ClassRule
+	public static MiniClusterResource miniClusterResource = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			new Configuration(),
+			1,
+			1));
 
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
-	}
+	@Test
+	public void testTopSpeedWindowingExampleITCase() throws Exception {
+		File inputFile = temporaryFolder.newFile();
+		FileUtils.writeFileUtf8(inputFile, TopSpeedWindowingExampleData.CAR_DATA);
+
+		final String resultPath = temporaryFolder.newFolder().toURI().toString();
 
-	@Override
-	protected void testProgram() throws Exception {
-		TopSpeedWindowing.main(new String[]{
-				"--input", textPath,
-				"--output", resultPath});
+		TopSpeedWindowing.main(new String[] {
+			"--input", inputFile.getAbsolutePath(),
+			"--output", resultPath});
+
+		compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index a09b22e..91ee9bf 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.test.socket;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.examples.socket.SocketWindowWordCount;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Test;
 
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for {@link SocketWindowWordCount}.
  */
-public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBase {
+public class SocketWindowWordCountITCase extends AbstractTestBase {
 
 	@Test
 	public void testJavaProgram() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index 24d1444..7407294 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -24,7 +24,6 @@ import org.apache.commons.io.FileUtils
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
 import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
@@ -37,7 +36,6 @@ import org.apache.flink.streaming.scala.examples.twitter.TwitterExample
 import org.apache.flink.streaming.scala.examples.windowing.{SessionWindowing, WindowWordCount}
 import org.apache.flink.streaming.scala.examples.wordcount.WordCount
 import org.apache.flink.streaming.test.examples.join.WindowJoinData
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.test.testdata.WordCountData
 import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.junit.Test
@@ -45,12 +43,12 @@ import org.junit.Test
 /**
  * Integration test for streaming programs in Scala examples.
  */
-class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+class StreamingExamplesITCase extends AbstractTestBase {
 
   @Test
   def testIterateExample(): Unit = {
-    val inputPath = AbstractTestBase.createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
+    val resultPath = getTempDirPath("result")
 
     // the example is inherently non-deterministic. The iteration timeout of 5000 ms
     // is frequently not enough to make the test run stable on CI infrastructure
@@ -99,14 +97,14 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testIncrementalLearningSkeleton(): Unit = {
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val resultPath = getTempDirPath("result")
     IncrementalLearningSkeleton.main(Array("--output", resultPath))
     TestBaseUtils.compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath)
   }
 
   @Test
   def testTwitterExample(): Unit = {
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val resultPath = getTempDirPath("result")
     TwitterExample.main(Array("--output", resultPath))
     TestBaseUtils.compareResultsByLinesInMemory(
       TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
@@ -115,7 +113,7 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testSessionWindowing(): Unit = {
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val resultPath = getTempDirPath("result")
     SessionWindowing.main(Array("--output", resultPath))
     TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
   }
@@ -124,8 +122,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
   def testWindowWordCount(): Unit = {
     val windowSize = "250"
     val slideSize = "150"
-    val textPath = AbstractTestBase.createTempFile("text.txt", WordCountData.TEXT)
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val textPath = createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = getTempDirPath("result")
 
     WindowWordCount.main(Array(
       "--input", textPath,
@@ -142,8 +140,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testWordCount(): Unit = {
-    val textPath = AbstractTestBase.createTempFile("text.txt", WordCountData.TEXT)
-    val resultPath = AbstractTestBase.getTempDirPath("result")
+    val textPath = createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = getTempDirPath("result")
 
     WordCount.main(Array(
       "--input", textPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 81b83a3..4f2383a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Either;
 
 import org.junit.After;
@@ -48,7 +48,7 @@ import java.util.Map;
  * End to end tests of both CEP operators and {@link NFA}.
  */
 @SuppressWarnings("serial")
-public class CEPITCase extends StreamingMultipleProgramsTestBase {
+public class CEPITCase extends AbstractTestBase {
 
 	private String resultPath;
 	private String expected;

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
index f3d0309..44f89cc 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -25,12 +25,12 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.runtime.utils.JavaStreamTestData;
 import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
@@ -41,7 +41,7 @@ import java.util.List;
 /**
  * Integration tests for streaming SQL.
  */
-public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
+public class JavaSqlITCase extends AbstractTestBase {
 
 	@Test
 	public void testRowRegisterRowWithNames() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 820de08..741a3cb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -20,13 +20,13 @@ package org.apache.flink.table.api.stream
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert.assertEquals
 import org.junit._
 
-class ExplainTest extends StreamingMultipleProgramsTestBase {
+class ExplainTest extends AbstractTestBase {
 
   private val testFilePath = this.getClass.getResource("/").getFile
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
index c1ad08c..1de2b1e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -19,13 +19,13 @@
 package org.apache.flink.table.api.stream.table.validation
 
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, ValidationException}
 import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Test
 
-class UnsupportedOpsValidationTest extends StreamingMultipleProgramsTestBase {
+class UnsupportedOpsValidationTest extends AbstractTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testSort(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index a301354..c553ee6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.runtime.stream
 
-import java.math.BigDecimal
 import java.lang.{Integer => JInt, Long => JLong}
+import java.math.BigDecimal
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -28,15 +28,15 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
 import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types}
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
+import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
 import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
+import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
 import org.apache.flink.table.runtime.utils.StreamITCase
 import org.apache.flink.table.utils.{MemoryTableSinkUtil, TestTableSourceWithTime}
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -46,7 +46,7 @@ import scala.collection.mutable
 /**
   * Tests for access and materialization of time attributes.
   */
-class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
+class TimeAttributesITCase extends AbstractTestBase {
 
   val data = List(
     (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index 30ada56..246ce2e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -20,17 +20,17 @@ package org.apache.flink.table.runtime.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
 
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+class TableSourceITCase extends AbstractTestBase {
 
   @Test
   def testCsvTableSource(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 46788f5..a20b626 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -20,20 +20,19 @@ package org.apache.flink.table.runtime.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.Literal
 import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, SplitUDF}
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, UserDefinedFunctionTestUtils}
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
 
-class CalcITCase extends StreamingMultipleProgramsTestBase {
+class CalcITCase extends AbstractTestBase {
 
   @Test
   def testSimpleSelectAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
index 215526d..0f563e6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -21,19 +21,19 @@ import java.lang.{Boolean => JBoolean}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
 import org.apache.flink.table.expressions.utils.{Func18, RichFunc2}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, _}
 import org.apache.flink.table.utils._
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
 import scala.collection.mutable
 
-class CorrelateITCase extends StreamingMultipleProgramsTestBase {
+class CorrelateITCase extends AbstractTestBase {
 
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
index 1eebeee..588cff1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
@@ -22,17 +22,17 @@ import java.math.BigDecimal
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.runtime.stream.table.GroupWindowITCase._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge}
 import org.apache.flink.table.runtime.utils.StreamITCase
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -43,7 +43,7 @@ import scala.collection.mutable
   * We only test some aggregations until better testing of constructed DataStream
   * programs is possible.
   */
-class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+class GroupWindowITCase extends AbstractTestBase {
   private val queryConfig = new StreamQueryConfig()
   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 5e15e14..479bce2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -20,18 +20,18 @@ package org.apache.flink.table.runtime.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
 
-class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
+class SetOperatorsITCase extends AbstractTestBase {
 
   @Test
   def testUnion(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index b44d8ef..f1badee 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -31,22 +31,21 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.utils.MemoryTableSinkUtil
-import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
-class TableSinkITCase extends StreamingMultipleProgramsTestBase {
+class TableSinkITCase extends AbstractTestBase {
 
   @Test
   def testInsertIntoRegisteredTableSink(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index c9ea30a..d1a88b7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -29,12 +29,12 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JExecEnv}
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
 import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
 import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.table.utils._
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 import org.junit.Assert._
@@ -43,7 +43,7 @@ import org.junit.Test
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+class TableSourceITCase extends AbstractTestBase {
 
   @Test(expected = classOf[TableException])
   def testInvalidDatastreamType(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
index 8c41f22..5cfab4a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
@@ -18,11 +18,11 @@
 package org.apache.flink.table.runtime.utils
 
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Rule
 import org.junit.rules.TemporaryFolder
 
-class StreamingWithStateTestBase extends StreamingMultipleProgramsTestBase {
+class StreamingWithStateTestBase extends AbstractTestBase {
 
   val _tempFolder = new TemporaryFolder
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index b76ade7..6fb06d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -70,6 +70,7 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.core.StringStartsWith;
 import org.junit.Assert;
@@ -90,7 +91,7 @@ import static org.junit.Assert.fail;
  * Tests for {@link DataStream}.
  */
 @SuppressWarnings("serial")
-public class DataStreamTest {
+public class DataStreamTest extends TestLogger {
 
 	/**
 	 * Tests union functionality. This ensures that self-unions and unions of streams

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
index fb7c765..2311092 100644
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
+++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.scala.api;
 
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.After;
 import org.junit.Before;
@@ -34,7 +34,7 @@ import static org.junit.Assert.fail;
 /**
  * IT cases for the {@link org.apache.flink.api.java.io.CsvOutputFormat}.
  */
-public class CsvOutputFormatITCase extends StreamingMultipleProgramsTestBase  {
+public class CsvOutputFormatITCase extends AbstractTestBase {
 
 	protected String resultPath;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
index 84b81e2..c2e450a 100644
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
+++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.scala.api;
 
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.After;
 import org.junit.Before;
@@ -34,7 +34,7 @@ import static org.junit.Assert.fail;
 /**
  * IT cases for the {@link org.apache.flink.api.java.io.TextOutputFormat}.
  */
-public class TextOutputFormatITCase extends StreamingMultipleProgramsTestBase {
+public class TextOutputFormatITCase extends AbstractTestBase {
 
 	protected String resultPath;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index fddbe00..5412e8e 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -27,14 +27,13 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-
-import org.junit.Test
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.mutable
 
-class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+class CoGroupJoinITCase extends AbstractTestBase {
 
   @Test
   def testCoGroup(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 60c609d..6158c8e 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala
 import java.lang
 
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.operators.ResourceSpec
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.ProcessFunction
@@ -32,12 +31,12 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.runtime.partitioner._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
 
-class DataStreamTest extends StreamingMultipleProgramsTestBase {
+class DataStreamTest extends AbstractTestBase {
 
   @Test
   def testNaming(): Unit = {
@@ -242,7 +241,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
    */
   @Test
   def testParallelism() {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val parallelism = env.getParallelism
 
     val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
     val map = src.map(x => (0L, 0L))
@@ -255,9 +255,12 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val sink = map.addSink(x => {})
 
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(parallelism == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(parallelism == env
+      .getStreamGraph
+      .getStreamNode(sink.getTransformation.getId)
+      .getParallelism)
 
     try {
       src.setParallelism(3)
@@ -268,18 +271,23 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
       }
     }
 
-    env.setParallelism(7)
+    val newParallelism = parallelism - 1
+
+    env.setParallelism(newParallelism)
     // the parallelism does not change since some windowing code takes the parallelism from
     // input operations and that cannot change dynamically
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(parallelism == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(parallelism == env
+      .getStreamGraph
+      .getStreamNode(sink.getTransformation.getId)
+      .getParallelism)
 
     val parallelSource = env.generateSequence(0, 0)
     parallelSource.print()
 
-    assert(7 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
+    assert(newParallelism == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
 
     parallelSource.setParallelism(3)
     assert(3 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
index 8e66171..e844928 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -27,8 +27,8 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.test.streaming.runtime.util.TestListResultSink
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
@@ -36,7 +36,7 @@ import org.junit.Test
 /**
  * Integration test for streaming programs using side outputs.
  */
-class SideOutputITCase extends StreamingMultipleProgramsTestBase {
+class SideOutputITCase extends AbstractTestBase {
 
   /**
     * Test ProcessFunction side output.

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
index 35a56d7..db0fb71 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
@@ -39,7 +39,7 @@ import org.junit.Test
   * These tests verify that the api calls on [[WindowedStream]] that use the "time" shortcut
   * instantiate the correct window operator.
   */
-class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+class TimeWindowTranslationTest extends AbstractTestBase {
 
   /**
     * Verifies that calls to timeWindow() instantiate a regular

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index dc38758..ef27685 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.{Ignore, Test}
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.mutable
 
@@ -41,7 +41,7 @@ import scala.collection.mutable
  * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
  * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
  */
-class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
+class WindowFoldITCase extends AbstractTestBase {
 
   @Test
   def testFoldWindow(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index ee1dbfd..b2137f5 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 import scala.collection.mutable
 
@@ -41,7 +41,7 @@ import scala.collection.mutable
  * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
  * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
  */
-class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
+class WindowReduceITCase extends AbstractTestBase {
 
   @Test
   def testReduceWindow(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
deleted file mode 100644
index aa6e618..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for streaming unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * <p>To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the StreamExecutionEnvironment from
- * the context:
- *
- * <pre>
- *   {@literal @}Test
- *   public void someTest() {
- *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- *   {@literal @}Test
- *   public void anotherTest() {
- *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- * </pre>
- */
-public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
-
-	// ------------------------------------------------------------------------
-	//  The mini cluster that is shared across tests
-	// ------------------------------------------------------------------------
-
-	protected static final int DEFAULT_PARALLELISM = 4;
-
-	protected static final Logger LOG = LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class);
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index bbd250d..65b351d 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -23,8 +23,6 @@ import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -42,7 +40,7 @@ import java.io.IOException;
  * <pre>
  *   {@literal @}Test
  *   public void someTest() {
- *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  *       // test code
  *       env.execute();
  *   }
@@ -58,8 +56,6 @@ import java.io.IOException;
  */
 public abstract class AbstractTestBase extends TestBaseUtils {
 
-	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
-
 	private static final int DEFAULT_PARALLELISM = 4;
 
 	@ClassRule
@@ -77,17 +73,17 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	//  Temporary File Utilities
 	// --------------------------------------------------------------------------------------------
 
-	public static String getTempDirPath(String dirName) throws IOException {
+	public String getTempDirPath(String dirName) throws IOException {
 		File f = createAndRegisterTempFile(dirName);
 		return f.toURI().toString();
 	}
 
-	public static String getTempFilePath(String fileName) throws IOException {
+	public String getTempFilePath(String fileName) throws IOException {
 		File f = createAndRegisterTempFile(fileName);
 		return f.toURI().toString();
 	}
 
-	public static String createTempFile(String fileName, String contents) throws IOException {
+	public String createTempFile(String fileName, String contents) throws IOException {
 		File f = createAndRegisterTempFile(fileName);
 		if (!f.getParentFile().exists()) {
 			f.getParentFile().mkdirs();
@@ -97,7 +93,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 		return f.toURI().toString();
 	}
 
-	public static File createAndRegisterTempFile(String fileName) throws IOException {
+	public File createAndRegisterTempFile(String fileName) throws IOException {
 		return new File(TEMPORARY_FOLDER.newFolder(), fileName);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index c8872ac..20dbebb 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -60,11 +61,13 @@ public class MiniClusterResource extends ExternalResource {
 
 	@Override
 	public void before() throws Exception {
+		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
+
+		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.numberSlotsPerTaskManager);
+
 		localFlinkMiniCluster = TestBaseUtils.startCluster(
-			miniClusterResourceConfiguration.getNumberTaskManagers(),
-			miniClusterResourceConfiguration.getNumberSlotsPerTaskManager(),
-			false,
-			false,
+			configuration,
 			true);
 
 		numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 06792ea..10039e6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,12 +18,8 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
@@ -57,7 +53,7 @@ import java.util.Collection;
  *
  * }</pre>
  */
-public class MultipleProgramsTestBase extends TestBaseUtils {
+public class MultipleProgramsTestBase extends AbstractTestBase {
 
 	/**
 	 * Enum that defines which execution environment to run the next test on:
@@ -70,16 +66,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 	}
 
 	// ------------------------------------------------------------------------
-	//  The mini cluster that is shared across tests
-	// ------------------------------------------------------------------------
-
-	protected static final int DEFAULT_PARALLELISM = 4;
-
-	protected static boolean startWebServer = false;
-
-	protected static LocalFlinkMiniCluster cluster = null;
-
-	// ------------------------------------------------------------------------
 
 	protected final TestExecutionMode mode;
 
@@ -93,12 +79,21 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 
 	@Before
 	public void setupEnvironment() {
+		TestEnvironment testEnvironment;
 		switch(mode){
 			case CLUSTER:
-				new TestEnvironment(cluster, 4, false).setAsContext();
+				// This only works because of the quirks we built in the TestEnvironment.
+				// We should refactor this in the future!!!
+				testEnvironment = miniClusterResource.getTestEnvironment();
+				testEnvironment.getConfig().disableObjectReuse();
+				testEnvironment.setAsContext();
 				break;
 			case CLUSTER_OBJECT_REUSE:
-				new TestEnvironment(cluster, 4, true).setAsContext();
+				// This only works because of the quirks we built in the TestEnvironment.
+				// We should refactor this in the future!!!
+				testEnvironment = miniClusterResource.getTestEnvironment();
+				testEnvironment.getConfig().enableObjectReuse();
+				testEnvironment.setAsContext();
 				break;
 			case COLLECTION:
 				new CollectionTestEnvironment().setAsContext();
@@ -120,25 +115,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Cluster setup & teardown
-	// ------------------------------------------------------------------------
-
-	@BeforeClass
-	public static void setup() throws Exception {
-		cluster = TestBaseUtils.startCluster(
-			1,
-			DEFAULT_PARALLELISM,
-			startWebServer,
-			false,
-			true);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-
-	// ------------------------------------------------------------------------
 	//  Parametrization lets the tests run in cluster and collection mode
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index d2237ad..5a96326 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -459,7 +459,7 @@ public class TestBaseUtils extends TestLogger {
 				throw new IllegalArgumentException("This path does not denote a local file.");
 			}
 		} catch (URISyntaxException | NullPointerException e) {
-			throw new IllegalArgumentException("This path does not describe a valid local file URI.");
+			throw new IllegalArgumentException("This path does not describe a valid local file URI.", e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index d224905..b207de8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Collector;
 
@@ -57,7 +57,7 @@ import static org.junit.Assert.assertTrue;
  * state reflects the "exactly once" semantics.
  */
 @SuppressWarnings({"serial", "deprecation"})
-public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBase {
+public class CoStreamCheckpointingITCase extends AbstractTestBase {
 
 	private static final long NUM_STRINGS = 10_000L;
 	private static final int PARALLELISM = 4;

http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 16d8b54..7b058a0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
@@ -69,7 +69,7 @@ import static org.junit.Assert.fail;
  * successfully completed checkpoint.
  */
 @SuppressWarnings("serial")
-public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTestBase {
+public class StreamCheckpointNotifierITCase extends AbstractTestBase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);