You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/22 15:45:39 UTC

[6/9] flink git commit: [hotfix] Introduce builder for MiniClusterResourceConfiguration

[hotfix] Introduce builder for MiniClusterResourceConfiguration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6de6ed48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6de6ed48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6de6ed48

Branch: refs/heads/master
Commit: 6de6ed48532a31a7ba90338ce7e5c2bc61d0ae11
Parents: bf7e101
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 14:57:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |   3 +-
 .../connectors/fs/RollingSinkITCase.java        |   9 +-
 .../connectors/fs/RollingSinkSecuredITCase.java |  11 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  10 +-
 .../connectors/kafka/KafkaTestBase.java         |  10 +-
 .../TopSpeedWindowingExampleITCase.java         |  10 +-
 .../hdfstests/DistributedCacheDfsTest.java      |   9 +-
 .../apache/flink/ml/util/FlinkTestBase.scala    |  10 +-
 .../gateway/local/LocalExecutorITCase.java      |  10 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |  13 +-
 .../HAQueryableStateFsBackendITCase.java        |  10 +-
 .../HAQueryableStateRocksDBBackendITCase.java   |  10 +-
 .../NonHAQueryableStateFsBackendITCase.java     |  10 +-
 ...NonHAQueryableStateRocksDBBackendITCase.java |  10 +-
 .../runtime/webmonitor/WebFrontendITCase.java   |  16 ++-
 .../webmonitor/handlers/JarRunHandlerTest.java  |  14 +-
 .../webmonitor/history/HistoryServerTest.java   |  14 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   6 +-
 .../minicluster/MiniClusterConfiguration.java   |  11 +-
 .../runtime/minicluster/RpcServiceSharing.java  |  28 ++++
 .../runtime/minicluster/MiniClusterITCase.java  |   4 +-
 .../flink/api/scala/ScalaShellITCase.scala      |   6 +-
 .../scala/ScalaShellLocalStartupITCase.scala    |   6 +-
 .../flink/test/util/AbstractTestBase.java       |   9 +-
 .../flink/test/util/MiniClusterResource.java    | 103 ++------------
 .../util/MiniClusterResourceConfiguration.java  | 138 +++++++++++++++++++
 .../apache/flink/test/util/TestBaseUtils.java   |  25 ++++
 .../accumulators/AccumulatorErrorITCase.java    |   9 +-
 .../accumulators/AccumulatorLiveITCase.java     |  10 +-
 .../test/cancelling/CancelingTestBase.java      |  10 +-
 ...tractEventTimeWindowCheckpointingITCase.java |  10 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |  10 +-
 .../KeyedStateCheckpointingITCase.java          |  10 +-
 .../test/checkpointing/RescalingITCase.java     |  10 +-
 .../ResumeCheckpointManuallyITCase.java         |  10 +-
 .../test/checkpointing/SavepointITCase.java     |  51 +++----
 .../StreamFaultToleranceTestBase.java           |  10 +-
 .../WindowCheckpointingITCase.java              |  10 +-
 .../ZooKeeperHighAvailabilityITCase.java        |  10 +-
 .../utils/SavepointMigrationTestBase.java       |  10 +-
 .../test/example/client/JobRetrievalITCase.java |  13 +-
 .../failing/JobSubmissionFailsITCase.java       |  10 +-
 .../manual/StreamingScalabilityAndLatency.java  |  10 +-
 .../flink/test/misc/AutoParallelismITCase.java  |  15 +-
 .../test/misc/CustomSerializationITCase.java    |  10 +-
 .../test/misc/MiscellaneousIssuesITCase.java    |   9 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |  10 +-
 .../operators/CustomDistributionITCase.java     |  10 +-
 ...SimpleRecoveryFailureRateStrategyITBase.java |   9 +-
 ...RecoveryFixedDelayRestartStrategyITBase.java |  10 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |  10 +-
 .../flink/test/runtime/NettyEpollITCase.java    |  11 +-
 .../runtime/NetworkStackThroughputITCase.java   |  11 +-
 .../AbstractOperatorRestoreTestBase.java        |  11 +-
 .../test/streaming/runtime/TimestampITCase.java |  10 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   8 +-
 56 files changed, 509 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index f837c4f..4e4993a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 
 import java.util.List;
 
@@ -137,7 +138,7 @@ public class LocalExecutor extends PlanExecutor {
 					configuration.getInteger(
 						ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
 						ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
-				.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+				.setRpcServiceSharing(RpcServiceSharing.SHARED)
 				.setNumSlotsPerTaskManager(
 					configuration.getInteger(
 						TaskManagerOptions.NUM_TASK_SLOTS, 1))

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 78b53d8..93f6d52 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -118,10 +119,10 @@ public class RollingSinkITCase extends TestLogger {
 				+ "/";
 
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				new org.apache.flink.configuration.Configuration(),
-				1,
-				4));
+			new MiniClusterResourceConfiguration.Builder()
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(4)
+				.build());
 
 		miniClusterResource.before();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index b9564ee..7296269 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.test.util.TestingSecurityContext;
@@ -147,10 +148,12 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 
 		Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled();
 
-		miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(
-			configuration,
-			1,
-			4));
+		miniClusterResource = new MiniClusterResource(
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(configuration)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(4)
+				.build());
 
 		miniClusterResource.before();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index f488325..1e3e1fc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.AfterClass;
@@ -70,10 +71,11 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 	@ClassRule
 	public static MiniClusterResource flink = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			NUM_TMS,
-			TM_SLOTS));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(TM_SLOTS)
+			.build());
 
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 697e075..806c90c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -81,10 +82,11 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	@ClassRule
 	public static MiniClusterResource flink = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getFlinkConfiguration(),
-			NUM_TMS,
-			TM_SLOTS),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getFlinkConfiguration())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(TM_SLOTS)
+			.build(),
 		true);
 
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index 320dd5f..ab8d6b9 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.test.examples.windowing;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
 import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -42,10 +42,10 @@ public class TopSpeedWindowingExampleITCase extends TestLogger {
 
 	@ClassRule
 	public static MiniClusterResource miniClusterResource = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			1,
-			1));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(1)
+			.build());
 
 	@Test
 	public void testTopSpeedWindowingExampleITCase() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
index 5d6a30f..ed4ec94 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -69,10 +70,10 @@ public class DistributedCacheDfsTest {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new org.apache.flink.configuration.Configuration(),
-			1,
-			1));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(1)
+			.build());
 
 	private static MiniDFSCluster hdfsCluster;
 	private static Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index 2165152..194b657 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,9 +18,7 @@
 
 package org.apache.flink.ml.util
 
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.MiniClusterResource
-import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration
+import org.apache.flink.test.util.{MiniClusterResource, MiniClusterResourceConfiguration}
 import org.scalatest.{BeforeAndAfter, Suite}
 
 /** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
@@ -57,8 +55,10 @@ trait FlinkTestBase extends BeforeAndAfter {
 
   before {
     val cl = new MiniClusterResource(
-      new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism)
-    )
+      new MiniClusterResourceConfiguration.Builder()
+        .setNumberTaskManagers(1)
+        .setNumberSlotsPerTaskManager(parallelism)
+        .build())
     
     cl.before()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 64b526d..ad2d509 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.TestLogger;
@@ -71,10 +72,11 @@ public class LocalExecutorITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfig(),
-			NUM_TMS,
-			NUM_SLOTS_PER_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfig())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+			.build(),
 		true);
 
 	private static ClusterClient<?> clusterClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index c00b5d3..90ba6d4 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -57,14 +59,15 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests to verify JMX reporter functionality on the JobManager.
  */
-public class JMXJobManagerMetricTest {
+public class JMXJobManagerMetricTest extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			1,
-			1),
+		new MiniClusterResourceConfiguration.Builder()
+		.setConfiguration(getConfiguration())
+		.setNumberSlotsPerTaskManager(1)
+		.setNumberTaskManagers(1)
+		.build(),
 		true);
 
 	private static Configuration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index a47045f..f977a8e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
@@ -68,10 +69,11 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				getConfig(),
-				NUM_TMS,
-				NUM_SLOTS_PER_TM),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfig())
+				.setNumberTaskManagers(NUM_TMS)
+				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+				.build(),
 			true);
 
 		miniClusterResource.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index b1092c1..25f56d7 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
@@ -68,10 +69,11 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				getConfig(),
-				NUM_TMS,
-				NUM_SLOTS_PER_TM),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfig())
+				.setNumberTaskManagers(NUM_TMS)
+				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+				.build(),
 			true);
 
 		miniClusterResource.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index eb300c1..0d61e0d 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -52,10 +53,11 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfig(),
-			NUM_TMS,
-			NUM_SLOTS_PER_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfig())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+			.build(),
 		true);
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 3d6a3e3..002ff9f 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -52,10 +53,11 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfig(),
-			NUM_TMS,
-			NUM_SLOTS_PER_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfig())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+			.build(),
 		true);
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 994966e..776a267 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -80,10 +81,11 @@ public class WebFrontendITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			CLUSTER_CONFIGURATION,
-			NUM_TASK_MANAGERS,
-			NUM_SLOTS),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(CLUSTER_CONFIGURATION)
+			.setNumberTaskManagers(NUM_TASK_MANAGERS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS)
+			.build(),
 		true
 	);
 
@@ -153,7 +155,7 @@ public class WebFrontendITCase extends TestLogger {
 		if (notFoundJobConnection.getResponseCode() >= 400) {
 			// we don't set the content-encoding header
 			Assert.assertNull(notFoundJobConnection.getContentEncoding());
-			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
 				Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
 			} else {
 				Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
@@ -281,7 +283,7 @@ public class WebFrontendITCase extends TestLogger {
 		final Deadline deadline = testTimeout.fromNow();
 
 		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
-			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
 				// stop the job
 				client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
 				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
@@ -356,7 +358,7 @@ public class WebFrontendITCase extends TestLogger {
 			HttpTestClient.SimpleHttpResponse response = client
 				.getNextResponse(deadline.timeLeft());
 
-			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
 				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 			} else {
 				assertEquals(HttpResponseStatus.OK, response.getStatus());

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index aefe4f1..d26620b 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.ClassRule;
@@ -62,12 +64,12 @@ public class JarRunHandlerTest {
 		config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString());
 
 		MiniClusterResource clusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				1,
-				1
-			),
-			MiniClusterResource.MiniClusterType.NEW
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(1)
+				.build(),
+			TestBaseUtils.CodebaseType.NEW
 		);
 		clusterResource.before();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 9407af2..5d29fbb 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -75,12 +77,12 @@ public class HistoryServerTest extends TestLogger {
 		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
 
 		cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				clusterConfig,
-				1,
-				1
-			),
-			MiniClusterResource.MiniClusterType.NEW
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(clusterConfig)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(1)
+				.build(),
+			TestBaseUtils.CodebaseType.NEW
 		);
 		cluster.before();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index aca4fdb..b89617b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -230,7 +230,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 			final Configuration configuration = miniClusterConfiguration.getConfiguration();
 			final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
 			final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
-			final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
+			final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
 
 			try {
 				initializeIOFormatClasses(configuration);
@@ -916,7 +916,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	@Nonnull
 	private CompletionStage<Void> terminateRpcServices() {
 		final int numRpcServices;
-		if (miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED) {
+		if (miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED) {
 			numRpcServices = 1;
 		} else {
 			numRpcServices = 1 + 2 + miniClusterConfiguration.getNumTaskManagers(); // common, JM, RM, TMs
@@ -927,7 +927,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		synchronized (lock) {
 			rpcTerminationFutures.add(commonRpcService.stopService());
 
-			if (miniClusterConfiguration.getRpcServiceSharing() != MiniClusterConfiguration.RpcServiceSharing.SHARED) {
+			if (miniClusterConfiguration.getRpcServiceSharing() != RpcServiceSharing.SHARED) {
 				rpcTerminationFutures.add(jobManagerRpcService.stopService());
 				rpcTerminationFutures.add(resourceManagerRpcService.stopService());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 0a0c692..f728f24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.runtime.minicluster.MiniClusterConfiguration.RpcServiceSharing.SHARED;
+import static org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED;
 
 /**
  * Configuration object for the {@link MiniCluster}.
@@ -117,15 +117,6 @@ public class MiniClusterConfiguration {
 	// Enums
 	// ----------------------------------------------------------------------------------
 
-	/**
-	 * Enum which defines whether the mini cluster components use a shared RpcService
-	 * or whether every component gets its own dedicated RpcService started.
-	 */
-	public enum RpcServiceSharing {
-		SHARED, // a single shared rpc service
-		DEDICATED // every component gets his own dedicated rpc service
-	}
-
 	// ----------------------------------------------------------------------------------
 	// Builder
 	// ----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
new file mode 100644
index 0000000..7b33869
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+/**
+ * Enum which defines whether the mini cluster components use a shared RpcService
+ * or whether every component gets its own dedicated RpcService started.
+ */
+public enum RpcServiceSharing {
+	SHARED, // a single shared rpc service
+	DEDICATED // every component gets his own dedicated rpc service
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index a72e654..30ac84f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -54,7 +54,7 @@ public class MiniClusterITCase extends TestLogger {
 	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-			.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+			.setRpcServiceSharing(RpcServiceSharing.SHARED)
 			.setConfiguration(configuration)
 			.build();
 
@@ -71,7 +71,7 @@ public class MiniClusterITCase extends TestLogger {
 	@Test
 	public void runJobWithMultipleRpcServices() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-			.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED)
+			.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
 			.setConfiguration(configuration)
 			.build();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 12522a8..c6b43c2 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -24,7 +24,8 @@ import java.util.Objects
 import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.MiniClusterResource
+import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils.CodebaseType
 import org.apache.flink.util.TestLogger
 import org.junit._
 import org.junit.rules.TemporaryFolder
@@ -321,8 +322,7 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    val isNew = Objects.equals(MiniClusterResource.NEW_CODEBASE,
-      System.getProperty(MiniClusterResource.CODEBASE_KEY))
+    val isNew = TestBaseUtils.isNewCodebase()
     if (isNew) {
       configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
       // set to different than default so not to interfere with ScalaShellLocalStartupITCase

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index a971db8..f13f57b 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -19,11 +19,10 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.Objects
 
 import org.apache.flink.configuration.{Configuration, CoreOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.MiniClusterResource
+import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit.rules.TemporaryFolder
 import org.junit.{Assert, Rule, Test}
@@ -87,8 +86,7 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     System.setOut(new PrintStream(baos))
 
     val configuration = new Configuration()
-    val mode = if (Objects.equals(MiniClusterResource.NEW_CODEBASE,
-      System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+    val mode = if (TestBaseUtils.isNewCodebase()) {
       CoreOptions.NEW_MODE
     } else {
       CoreOptions.LEGACY_MODE

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 65b351d..0b7a3b3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
@@ -60,10 +59,10 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 
 	@ClassRule
 	public static MiniClusterResource miniClusterResource = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			1,
-			DEFAULT_PARALLELISM));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+			.build());
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index cbe329c..c412bbf 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -18,18 +18,10 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.*;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -38,13 +30,11 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
-
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -56,15 +46,11 @@ public class MiniClusterResource extends ExternalResource {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
 
-	public static final String CODEBASE_KEY = "codebase";
-
-	public static final String NEW_CODEBASE = "new";
-
 	private final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
 
-	private final MiniClusterType miniClusterType;
+	private final TestBaseUtils.CodebaseType miniClusterType;
 
 	private JobExecutorService jobExecutorService;
 
@@ -85,30 +71,30 @@ public class MiniClusterResource extends ExternalResource {
 	}
 
 	public MiniClusterResource(
-			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-			final MiniClusterType miniClusterType) {
+		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+		final TestBaseUtils.CodebaseType miniClusterType) {
 		this(miniClusterResourceConfiguration, miniClusterType, false);
 	}
 
 	public MiniClusterResource(
-			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-			final boolean enableClusterClient) {
+		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+		final boolean enableClusterClient) {
 		this(
 			miniClusterResourceConfiguration,
-			Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? MiniClusterType.NEW : MiniClusterType.LEGACY,
+			miniClusterResourceConfiguration.getCodebaseType(),
 			enableClusterClient);
 	}
 
 	private MiniClusterResource(
-			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-			final MiniClusterType miniClusterType,
-			final boolean enableClusterClient) {
+		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+		final TestBaseUtils.CodebaseType miniClusterType,
+		final boolean enableClusterClient) {
 		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
 		this.miniClusterType = Preconditions.checkNotNull(miniClusterType);
 		this.enableClusterClient = enableClusterClient;
 	}
 
-	public MiniClusterType getMiniClusterType() {
+	public TestBaseUtils.CodebaseType getMiniClusterType() {
 		return miniClusterType;
 	}
 
@@ -187,7 +173,7 @@ public class MiniClusterResource extends ExternalResource {
 		}
 	}
 
-	private void startJobExecutorService(MiniClusterType miniClusterType) throws Exception {
+	private void startJobExecutorService(TestBaseUtils.CodebaseType miniClusterType) throws Exception {
 		switch (miniClusterType) {
 			case LEGACY:
 				startLegacyMiniCluster();
@@ -196,7 +182,7 @@ public class MiniClusterResource extends ExternalResource {
 				startMiniCluster();
 				break;
 			default:
-				throw new FlinkRuntimeException("Unknown MiniClusterType "  + miniClusterType + '.');
+				throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.');
 		}
 	}
 
@@ -264,67 +250,4 @@ public class MiniClusterResource extends ExternalResource {
 
 		webUIPort = miniCluster.getRestAddress().getPort();
 	}
-
-	/**
-	 * Mini cluster resource configuration object.
-	 */
-	public static class MiniClusterResourceConfiguration {
-		private final Configuration configuration;
-
-		private final int numberTaskManagers;
-
-		private final int numberSlotsPerTaskManager;
-
-		private final Time shutdownTimeout;
-
-		public MiniClusterResourceConfiguration(
-				Configuration configuration,
-				int numberTaskManagers,
-				int numberSlotsPerTaskManager) {
-			this(
-				configuration,
-				numberTaskManagers,
-				numberSlotsPerTaskManager,
-				AkkaUtils.getTimeoutAsTime(configuration));
-		}
-
-		public MiniClusterResourceConfiguration(
-				Configuration configuration,
-				int numberTaskManagers,
-				int numberSlotsPerTaskManager,
-				Time shutdownTimeout) {
-			this.configuration = Preconditions.checkNotNull(configuration);
-			this.numberTaskManagers = numberTaskManagers;
-			this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
-			this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
-		}
-
-		public Configuration getConfiguration() {
-			return configuration;
-		}
-
-		public int getNumberTaskManagers() {
-			return numberTaskManagers;
-		}
-
-		public int getNumberSlotsPerTaskManager() {
-			return numberSlotsPerTaskManager;
-		}
-
-		public Time getShutdownTimeout() {
-			return shutdownTimeout;
-		}
-	}
-
-	// ---------------------------------------------
-	// Enum definitions
-	// ---------------------------------------------
-
-	/**
-	 * Type of the mini cluster to start.
-	 */
-	public enum MiniClusterType {
-		LEGACY,
-		NEW
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
new file mode 100644
index 0000000..c938920
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Mini cluster resource configuration object.
+ */
+public class MiniClusterResourceConfiguration {
+
+	private final Configuration configuration;
+
+	private final int numberTaskManagers;
+
+	private final int numberSlotsPerTaskManager;
+
+	private final Time shutdownTimeout;
+
+	private final TestBaseUtils.CodebaseType codebaseType;
+
+	private final RpcServiceSharing rpcServiceSharing;
+
+	MiniClusterResourceConfiguration(
+		Configuration configuration,
+		int numberTaskManagers,
+		int numberSlotsPerTaskManager,
+		Time shutdownTimeout,
+		TestBaseUtils.CodebaseType codebaseType,
+		RpcServiceSharing rpcServiceSharing) {
+		this.configuration = Preconditions.checkNotNull(configuration);
+		this.numberTaskManagers = numberTaskManagers;
+		this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+		this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
+		this.codebaseType = Preconditions.checkNotNull(codebaseType);
+		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
+	}
+
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	public int getNumberTaskManagers() {
+		return numberTaskManagers;
+	}
+
+	public int getNumberSlotsPerTaskManager() {
+		return numberSlotsPerTaskManager;
+	}
+
+	public Time getShutdownTimeout() {
+		return shutdownTimeout;
+	}
+
+	/**
+	 * @deprecated Will be irrelevant once the legacy mode has been removed.
+	 */
+	@Deprecated
+	public TestBaseUtils.CodebaseType getCodebaseType() {
+		return codebaseType;
+	}
+
+	public RpcServiceSharing getRpcServiceSharing() {
+		return rpcServiceSharing;
+	}
+
+	/**
+	 * Builder for {@link MiniClusterResourceConfiguration}.
+	 */
+	public static final class Builder {
+
+		private Configuration configuration = new Configuration();
+		private int numberTaskManagers = 1;
+		private int numberSlotsPerTaskManager = 1;
+		private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration);
+		private TestBaseUtils.CodebaseType codebaseType = TestBaseUtils.getCodebaseType();
+
+		private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
+
+		public Builder setConfiguration(Configuration configuration) {
+			this.configuration = configuration;
+			return this;
+		}
+
+		public Builder setNumberTaskManagers(int numberTaskManagers) {
+			this.numberTaskManagers = numberTaskManagers;
+			return this;
+		}
+
+		public Builder setNumberSlotsPerTaskManager(int numberSlotsPerTaskManager) {
+			this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+			return this;
+		}
+
+		public Builder setShutdownTimeout(Time shutdownTimeout) {
+			this.shutdownTimeout = shutdownTimeout;
+			return this;
+		}
+
+		/**
+		 * @deprecated Will be irrelevant once the legacy mode has been removed.
+		 */
+		@Deprecated
+		public Builder setCodebaseType(TestBaseUtils.CodebaseType codebaseType) {
+			this.codebaseType = codebaseType;
+			return this;
+		}
+
+		public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
+			this.rpcServiceSharing = rpcServiceSharing;
+			return this;
+		}
+
+		public MiniClusterResourceConfiguration build() {
+			return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 7e9b12e..10fe1f0 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -44,6 +44,8 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
@@ -103,6 +105,9 @@ public class TestBaseUtils extends TestLogger {
 
 	public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L);
 
+	static final String NEW_CODEBASE = "new";
+	static final String CODEBASE_KEY = "codebase";
+
 	// ------------------------------------------------------------------------
 
 	protected static File logDir;
@@ -673,6 +678,26 @@ public class TestBaseUtils extends TestLogger {
 		throw new TimeoutException("Could not get HTTP response in time since the service is still unavailable.");
 	}
 
+	@Nonnull
+	public static CodebaseType getCodebaseType() {
+		return Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY;
+	}
+
+	public static boolean isNewCodebase() {
+		return CodebaseType.NEW == getCodebaseType();
+	}
+
+	/**
+	 * Type of the mini cluster to start.
+	 *
+	 * @deprecated Will be irrelevant once the legacy mode has been removed.
+	 */
+	@Deprecated
+	public enum CodebaseType {
+		LEGACY,
+		NEW
+	}
+
 	/**
 	 * Comparator for comparable Tuples.
 	 * @param <T> tuple type

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 3d90833..d186e36 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -50,10 +51,10 @@ public class AccumulatorErrorITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			3));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(3)
+			.build());
 
 	public static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 347c2e3..5944d3a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -84,10 +85,11 @@ public class AccumulatorLiveITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			1,
-			1),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(1)
+			.build(),
 		true);
 
 	private static Configuration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index cac16f0..871df7d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -56,10 +57,11 @@ public abstract class CancelingTestBase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			4),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(4)
+			.build(),
 		true);
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 61baefa..a1e31e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -106,10 +107,11 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	protected final MiniClusterResource getMiniClusterResource() {
 		return new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				getConfigurationSafe(),
-				2,
-				PARALLELISM / 2));
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfigurationSafe())
+				.setNumberTaskManagers(2)
+				.setNumberSlotsPerTaskManager(PARALLELISM / 2)
+				.build());
 	}
 
 	private Configuration getConfigurationSafe() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 07167a9..b82abb9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -66,10 +67,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			PARALLELISM / 2));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(PARALLELISM / 2)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index e5868c3..caa1791 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
@@ -79,10 +80,11 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			NUM_TASK_MANAGERS,
-			NUM_TASK_SLOTS));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(NUM_TASK_MANAGERS)
+			.setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 55631a2..e7fcefd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -52,6 +52,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -127,10 +128,11 @@ public class RescalingITCase extends TestLogger {
 			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 			cluster = new MiniClusterResource(
-				new MiniClusterResource.MiniClusterResourceConfiguration(
-					config,
-					numTaskManagers,
-					slotsPerTaskManager),
+				new MiniClusterResourceConfiguration.Builder()
+					.setConfiguration(config)
+					.setNumberTaskManagers(numTaskManagers)
+					.setNumberSlotsPerTaskManager(numSlots)
+					.build(),
 				true);
 			cluster.before();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index aebaa63..f4fe8e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
@@ -263,10 +264,11 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 		}
 
 		MiniClusterResource cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				NUM_TASK_MANAGERS,
-				SLOTS_PER_TASK_MANAGER),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(NUM_TASK_MANAGERS)
+				.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+				.build(),
 			true);
 
 		cluster.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2805144..b46c485 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -227,11 +228,11 @@ public class SavepointITCase extends TestLogger {
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		MiniClusterResource cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				numTaskManagers,
-				numSlotsPerTaskManager
-			),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(numTaskManagers)
+				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+				.build(),
 			true);
 		cluster.before();
 		ClusterClient<?> client = cluster.getClusterClient();
@@ -296,11 +297,11 @@ public class SavepointITCase extends TestLogger {
 
 		// Start Flink
 		MiniClusterResource cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				numTaskManagers,
-				numSlotsPerTaskManager
-			),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(numTaskManagers)
+				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+				.build(),
 			true);
 
 		LOG.info("Shutting down Flink cluster.");
@@ -341,11 +342,11 @@ public class SavepointITCase extends TestLogger {
 		// create a new TestingCluster to make sure we start with completely
 		// new resources
 		cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				numTaskManagers,
-				numSlotsPerTaskManager
-			),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(numTaskManagers)
+				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+				.build(),
 			true);
 		LOG.info("Restarting Flink cluster.");
 		cluster.before();
@@ -578,11 +579,11 @@ public class SavepointITCase extends TestLogger {
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		MiniClusterResource cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				1,
-				2 * jobGraph.getMaximumParallelism()
-			),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism())
+				.build(),
 			true);
 		cluster.before();
 		ClusterClient<?> client = cluster.getClusterClient();
@@ -708,11 +709,11 @@ public class SavepointITCase extends TestLogger {
 
 		MiniClusterResource get() {
 			return new MiniClusterResource(
-				new MiniClusterResource.MiniClusterResourceConfiguration(
-					config,
-					numTaskManagers,
-					numSlotsPerTaskManager
-				),
+				new MiniClusterResourceConfiguration.Builder()
+					.setConfiguration(config)
+					.setNumberTaskManagers(numTaskManagers)
+					.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+					.build(),
 				true);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 5ac1214..0ed9d6b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -19,9 +19,9 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -42,10 +42,10 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			NUM_TASK_MANAGERS,
-			NUM_TASK_SLOTS));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(NUM_TASK_MANAGERS)
+			.setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+			.build());
 
 	/**
 	 * Implementations are expected to assemble the test topology in this function

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index bc73717..1b93684 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -74,10 +75,11 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			PARALLELISM / 2));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(PARALLELISM / 2)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 156d448..6fb18e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -109,10 +110,11 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				NUM_TMS,
-				NUM_SLOTS_PER_TM),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(NUM_TMS)
+				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+				.build(),
 			true);
 
 		miniClusterResource.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 84cb88a..0d43aa9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.OptionalFailure;
 
@@ -88,10 +89,11 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 	protected SavepointMigrationTestBase() throws Exception {
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				getConfiguration(),
-				1,
-				DEFAULT_PARALLELISM),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfiguration())
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+				.build(),
 			true);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index d8ed2ee..93bec74 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -53,12 +55,11 @@ public class JobRetrievalITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			1,
-			4
-		),
-		MiniClusterResource.MiniClusterType.NEW
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(4)
+			.build(),
+		TestBaseUtils.CodebaseType.NEW
 	);
 
 	private RestClusterClient<StandaloneClusterId> client;

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index ecd16a1..5ca9ba2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -51,10 +52,11 @@ public class JobSubmissionFailsITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			NUM_TM,
-			NUM_SLOTS / NUM_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(NUM_TM)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS / NUM_TM)
+			.build(),
 		true);
 
 	private static Configuration getConfiguration() {