You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/22 15:45:34 UTC

[1/9] flink git commit: [FLINK-9646] [tests] Harden ExecutionGraphCoLocationRestartTest.testConstraintsAfterRestart

Repository: flink
Updated Branches:
  refs/heads/master bf7e1010b -> 8561f4cce


[FLINK-9646] [tests] Harden ExecutionGraphCoLocationRestartTest.testConstraintsAfterRestart

Not only wait for resource assingment but until all Executions have reached
the state DEPLOYING. That way it is assured that no Execution is still in state
SCHEDULED and will directly transition into the state CANCELED when calling
Execution.cancel.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8561f4cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8561f4cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8561f4cc

Branch: refs/heads/master
Commit: 8561f4cce47bda416d7c09071ea18515ab2c75e1
Parents: 5372813
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jun 22 17:40:49 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../executiongraph/ExecutionGraphCoLocationRestartTest.java | 9 +++++++--
 .../runtime/executiongraph/ExecutionGraphTestUtils.java     | 6 +++++-
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8561f4cc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index aba6bfa..6c5a446 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -34,6 +35,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.function.Predicate;
+
 import static org.apache.flink.runtime.jobgraph.JobStatus.FINISHED;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -85,9 +88,11 @@ public class ExecutionGraphCoLocationRestartTest extends SchedulerTestBase {
 
 		eg.scheduleForExecution();
 
+		Predicate<Execution> isDeploying = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
+
 		ExecutionGraphTestUtils.waitForAllExecutionsPredicate(
 			eg,
-			ExecutionGraphTestUtils.hasResourceAssigned,
+			isDeploying,
 			timeout);
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
@@ -102,7 +107,7 @@ public class ExecutionGraphCoLocationRestartTest extends SchedulerTestBase {
 
 		ExecutionGraphTestUtils.waitForAllExecutionsPredicate(
 			eg,
-			ExecutionGraphTestUtils.hasResourceAssigned,
+			isDeploying,
 			timeout);
 
 		//checking execution vertex properties

http://git-wip-us.apache.org/repos/asf/flink/blob/8561f4cc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 9cfe90e..ad8d0aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -193,7 +193,11 @@ public class ExecutionGraphTestUtils {
 	/**
 	 * Predicate which is true if the given {@link Execution} has a resource assigned.
 	 */
-	public static final Predicate<Execution> hasResourceAssigned = (Execution execution) -> execution.getAssignedResource() != null;
+	static final Predicate<Execution> hasResourceAssigned = (Execution execution) -> execution.getAssignedResource() != null;
+
+	static Predicate<Execution> isInExecutionState(ExecutionState executionState) {
+		return (Execution execution) -> execution.getState() == executionState;
+	}
 
 	public static void waitUntilFailoverRegionState(FailoverRegion region, JobStatus status, long maxWaitMillis)
 			throws TimeoutException {


[6/9] flink git commit: [hotfix] Introduce builder for MiniClusterResourceConfiguration

Posted by tr...@apache.org.
[hotfix] Introduce builder for MiniClusterResourceConfiguration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6de6ed48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6de6ed48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6de6ed48

Branch: refs/heads/master
Commit: 6de6ed48532a31a7ba90338ce7e5c2bc61d0ae11
Parents: bf7e101
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 14:57:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |   3 +-
 .../connectors/fs/RollingSinkITCase.java        |   9 +-
 .../connectors/fs/RollingSinkSecuredITCase.java |  11 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  10 +-
 .../connectors/kafka/KafkaTestBase.java         |  10 +-
 .../TopSpeedWindowingExampleITCase.java         |  10 +-
 .../hdfstests/DistributedCacheDfsTest.java      |   9 +-
 .../apache/flink/ml/util/FlinkTestBase.scala    |  10 +-
 .../gateway/local/LocalExecutorITCase.java      |  10 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |  13 +-
 .../HAQueryableStateFsBackendITCase.java        |  10 +-
 .../HAQueryableStateRocksDBBackendITCase.java   |  10 +-
 .../NonHAQueryableStateFsBackendITCase.java     |  10 +-
 ...NonHAQueryableStateRocksDBBackendITCase.java |  10 +-
 .../runtime/webmonitor/WebFrontendITCase.java   |  16 ++-
 .../webmonitor/handlers/JarRunHandlerTest.java  |  14 +-
 .../webmonitor/history/HistoryServerTest.java   |  14 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   6 +-
 .../minicluster/MiniClusterConfiguration.java   |  11 +-
 .../runtime/minicluster/RpcServiceSharing.java  |  28 ++++
 .../runtime/minicluster/MiniClusterITCase.java  |   4 +-
 .../flink/api/scala/ScalaShellITCase.scala      |   6 +-
 .../scala/ScalaShellLocalStartupITCase.scala    |   6 +-
 .../flink/test/util/AbstractTestBase.java       |   9 +-
 .../flink/test/util/MiniClusterResource.java    | 103 ++------------
 .../util/MiniClusterResourceConfiguration.java  | 138 +++++++++++++++++++
 .../apache/flink/test/util/TestBaseUtils.java   |  25 ++++
 .../accumulators/AccumulatorErrorITCase.java    |   9 +-
 .../accumulators/AccumulatorLiveITCase.java     |  10 +-
 .../test/cancelling/CancelingTestBase.java      |  10 +-
 ...tractEventTimeWindowCheckpointingITCase.java |  10 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |  10 +-
 .../KeyedStateCheckpointingITCase.java          |  10 +-
 .../test/checkpointing/RescalingITCase.java     |  10 +-
 .../ResumeCheckpointManuallyITCase.java         |  10 +-
 .../test/checkpointing/SavepointITCase.java     |  51 +++----
 .../StreamFaultToleranceTestBase.java           |  10 +-
 .../WindowCheckpointingITCase.java              |  10 +-
 .../ZooKeeperHighAvailabilityITCase.java        |  10 +-
 .../utils/SavepointMigrationTestBase.java       |  10 +-
 .../test/example/client/JobRetrievalITCase.java |  13 +-
 .../failing/JobSubmissionFailsITCase.java       |  10 +-
 .../manual/StreamingScalabilityAndLatency.java  |  10 +-
 .../flink/test/misc/AutoParallelismITCase.java  |  15 +-
 .../test/misc/CustomSerializationITCase.java    |  10 +-
 .../test/misc/MiscellaneousIssuesITCase.java    |   9 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |  10 +-
 .../operators/CustomDistributionITCase.java     |  10 +-
 ...SimpleRecoveryFailureRateStrategyITBase.java |   9 +-
 ...RecoveryFixedDelayRestartStrategyITBase.java |  10 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |  10 +-
 .../flink/test/runtime/NettyEpollITCase.java    |  11 +-
 .../runtime/NetworkStackThroughputITCase.java   |  11 +-
 .../AbstractOperatorRestoreTestBase.java        |  11 +-
 .../test/streaming/runtime/TimestampITCase.java |  10 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   8 +-
 56 files changed, 509 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index f837c4f..4e4993a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 
 import java.util.List;
 
@@ -137,7 +138,7 @@ public class LocalExecutor extends PlanExecutor {
 					configuration.getInteger(
 						ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
 						ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
-				.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+				.setRpcServiceSharing(RpcServiceSharing.SHARED)
 				.setNumSlotsPerTaskManager(
 					configuration.getInteger(
 						TaskManagerOptions.NUM_TASK_SLOTS, 1))

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 78b53d8..93f6d52 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -118,10 +119,10 @@ public class RollingSinkITCase extends TestLogger {
 				+ "/";
 
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				new org.apache.flink.configuration.Configuration(),
-				1,
-				4));
+			new MiniClusterResourceConfiguration.Builder()
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(4)
+				.build());
 
 		miniClusterResource.before();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index b9564ee..7296269 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.test.util.TestingSecurityContext;
@@ -147,10 +148,12 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 
 		Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled();
 
-		miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(
-			configuration,
-			1,
-			4));
+		miniClusterResource = new MiniClusterResource(
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(configuration)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(4)
+				.build());
 
 		miniClusterResource.before();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index f488325..1e3e1fc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.AfterClass;
@@ -70,10 +71,11 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 	@ClassRule
 	public static MiniClusterResource flink = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			NUM_TMS,
-			TM_SLOTS));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(TM_SLOTS)
+			.build());
 
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 697e075..806c90c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -81,10 +82,11 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	@ClassRule
 	public static MiniClusterResource flink = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getFlinkConfiguration(),
-			NUM_TMS,
-			TM_SLOTS),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getFlinkConfiguration())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(TM_SLOTS)
+			.build(),
 		true);
 
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index 320dd5f..ab8d6b9 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.test.examples.windowing;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
 import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -42,10 +42,10 @@ public class TopSpeedWindowingExampleITCase extends TestLogger {
 
 	@ClassRule
 	public static MiniClusterResource miniClusterResource = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			1,
-			1));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(1)
+			.build());
 
 	@Test
 	public void testTopSpeedWindowingExampleITCase() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
index 5d6a30f..ed4ec94 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -69,10 +70,10 @@ public class DistributedCacheDfsTest {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new org.apache.flink.configuration.Configuration(),
-			1,
-			1));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(1)
+			.build());
 
 	private static MiniDFSCluster hdfsCluster;
 	private static Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
index 2165152..194b657 100644
--- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
+++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala
@@ -18,9 +18,7 @@
 
 package org.apache.flink.ml.util
 
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.MiniClusterResource
-import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration
+import org.apache.flink.test.util.{MiniClusterResource, MiniClusterResourceConfiguration}
 import org.scalatest.{BeforeAndAfter, Suite}
 
 /** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
@@ -57,8 +55,10 @@ trait FlinkTestBase extends BeforeAndAfter {
 
   before {
     val cl = new MiniClusterResource(
-      new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism)
-    )
+      new MiniClusterResourceConfiguration.Builder()
+        .setNumberTaskManagers(1)
+        .setNumberSlotsPerTaskManager(parallelism)
+        .build())
     
     cl.before()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 64b526d..ad2d509 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.TestLogger;
@@ -71,10 +72,11 @@ public class LocalExecutorITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfig(),
-			NUM_TMS,
-			NUM_SLOTS_PER_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfig())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+			.build(),
 		true);
 
 	private static ClusterClient<?> clusterClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index c00b5d3..90ba6d4 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -57,14 +59,15 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests to verify JMX reporter functionality on the JobManager.
  */
-public class JMXJobManagerMetricTest {
+public class JMXJobManagerMetricTest extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			1,
-			1),
+		new MiniClusterResourceConfiguration.Builder()
+		.setConfiguration(getConfiguration())
+		.setNumberSlotsPerTaskManager(1)
+		.setNumberTaskManagers(1)
+		.build(),
 		true);
 
 	private static Configuration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index a47045f..f977a8e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
@@ -68,10 +69,11 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				getConfig(),
-				NUM_TMS,
-				NUM_SLOTS_PER_TM),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfig())
+				.setNumberTaskManagers(NUM_TMS)
+				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+				.build(),
 			true);
 
 		miniClusterResource.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index b1092c1..25f56d7 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
@@ -68,10 +69,11 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				getConfig(),
-				NUM_TMS,
-				NUM_SLOTS_PER_TM),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfig())
+				.setNumberTaskManagers(NUM_TMS)
+				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+				.build(),
 			true);
 
 		miniClusterResource.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index eb300c1..0d61e0d 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -52,10 +53,11 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfig(),
-			NUM_TMS,
-			NUM_SLOTS_PER_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfig())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+			.build(),
 		true);
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 3d6a3e3..002ff9f 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -52,10 +53,11 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfig(),
-			NUM_TMS,
-			NUM_SLOTS_PER_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfig())
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+			.build(),
 		true);
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 994966e..776a267 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -80,10 +81,11 @@ public class WebFrontendITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			CLUSTER_CONFIGURATION,
-			NUM_TASK_MANAGERS,
-			NUM_SLOTS),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(CLUSTER_CONFIGURATION)
+			.setNumberTaskManagers(NUM_TASK_MANAGERS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS)
+			.build(),
 		true
 	);
 
@@ -153,7 +155,7 @@ public class WebFrontendITCase extends TestLogger {
 		if (notFoundJobConnection.getResponseCode() >= 400) {
 			// we don't set the content-encoding header
 			Assert.assertNull(notFoundJobConnection.getContentEncoding());
-			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
 				Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
 			} else {
 				Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
@@ -281,7 +283,7 @@ public class WebFrontendITCase extends TestLogger {
 		final Deadline deadline = testTimeout.fromNow();
 
 		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
-			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
 				// stop the job
 				client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
 				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
@@ -356,7 +358,7 @@ public class WebFrontendITCase extends TestLogger {
 			HttpTestClient.SimpleHttpResponse response = client
 				.getNextResponse(deadline.timeLeft());
 
-			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
 				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 			} else {
 				assertEquals(HttpResponseStatus.OK, response.getStatus());

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index aefe4f1..d26620b 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.ClassRule;
@@ -62,12 +64,12 @@ public class JarRunHandlerTest {
 		config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString());
 
 		MiniClusterResource clusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				1,
-				1
-			),
-			MiniClusterResource.MiniClusterType.NEW
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(1)
+				.build(),
+			TestBaseUtils.CodebaseType.NEW
 		);
 		clusterResource.before();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 9407af2..5d29fbb 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -75,12 +77,12 @@ public class HistoryServerTest extends TestLogger {
 		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
 
 		cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				clusterConfig,
-				1,
-				1
-			),
-			MiniClusterResource.MiniClusterType.NEW
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(clusterConfig)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(1)
+				.build(),
+			TestBaseUtils.CodebaseType.NEW
 		);
 		cluster.before();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index aca4fdb..b89617b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -230,7 +230,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 			final Configuration configuration = miniClusterConfiguration.getConfiguration();
 			final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
 			final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
-			final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
+			final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
 
 			try {
 				initializeIOFormatClasses(configuration);
@@ -916,7 +916,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	@Nonnull
 	private CompletionStage<Void> terminateRpcServices() {
 		final int numRpcServices;
-		if (miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED) {
+		if (miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED) {
 			numRpcServices = 1;
 		} else {
 			numRpcServices = 1 + 2 + miniClusterConfiguration.getNumTaskManagers(); // common, JM, RM, TMs
@@ -927,7 +927,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 		synchronized (lock) {
 			rpcTerminationFutures.add(commonRpcService.stopService());
 
-			if (miniClusterConfiguration.getRpcServiceSharing() != MiniClusterConfiguration.RpcServiceSharing.SHARED) {
+			if (miniClusterConfiguration.getRpcServiceSharing() != RpcServiceSharing.SHARED) {
 				rpcTerminationFutures.add(jobManagerRpcService.stopService());
 				rpcTerminationFutures.add(resourceManagerRpcService.stopService());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 0a0c692..f728f24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.runtime.minicluster.MiniClusterConfiguration.RpcServiceSharing.SHARED;
+import static org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED;
 
 /**
  * Configuration object for the {@link MiniCluster}.
@@ -117,15 +117,6 @@ public class MiniClusterConfiguration {
 	// Enums
 	// ----------------------------------------------------------------------------------
 
-	/**
-	 * Enum which defines whether the mini cluster components use a shared RpcService
-	 * or whether every component gets its own dedicated RpcService started.
-	 */
-	public enum RpcServiceSharing {
-		SHARED, // a single shared rpc service
-		DEDICATED // every component gets his own dedicated rpc service
-	}
-
 	// ----------------------------------------------------------------------------------
 	// Builder
 	// ----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
new file mode 100644
index 0000000..7b33869
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+/**
+ * Enum which defines whether the mini cluster components use a shared RpcService
+ * or whether every component gets its own dedicated RpcService started.
+ */
+public enum RpcServiceSharing {
+	SHARED, // a single shared rpc service
+	DEDICATED // every component gets his own dedicated rpc service
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index a72e654..30ac84f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -54,7 +54,7 @@ public class MiniClusterITCase extends TestLogger {
 	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-			.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+			.setRpcServiceSharing(RpcServiceSharing.SHARED)
 			.setConfiguration(configuration)
 			.build();
 
@@ -71,7 +71,7 @@ public class MiniClusterITCase extends TestLogger {
 	@Test
 	public void runJobWithMultipleRpcServices() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-			.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED)
+			.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
 			.setConfiguration(configuration)
 			.build();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 12522a8..c6b43c2 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -24,7 +24,8 @@ import java.util.Objects
 import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.MiniClusterResource
+import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils.CodebaseType
 import org.apache.flink.util.TestLogger
 import org.junit._
 import org.junit.rules.TemporaryFolder
@@ -321,8 +322,7 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    val isNew = Objects.equals(MiniClusterResource.NEW_CODEBASE,
-      System.getProperty(MiniClusterResource.CODEBASE_KEY))
+    val isNew = TestBaseUtils.isNewCodebase()
     if (isNew) {
       configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
       // set to different than default so not to interfere with ScalaShellLocalStartupITCase

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index a971db8..f13f57b 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -19,11 +19,10 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.Objects
 
 import org.apache.flink.configuration.{Configuration, CoreOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.MiniClusterResource
+import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit.rules.TemporaryFolder
 import org.junit.{Assert, Rule, Test}
@@ -87,8 +86,7 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     System.setOut(new PrintStream(baos))
 
     val configuration = new Configuration()
-    val mode = if (Objects.equals(MiniClusterResource.NEW_CODEBASE,
-      System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+    val mode = if (TestBaseUtils.isNewCodebase()) {
       CoreOptions.NEW_MODE
     } else {
       CoreOptions.LEGACY_MODE

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 65b351d..0b7a3b3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
@@ -60,10 +59,10 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 
 	@ClassRule
 	public static MiniClusterResource miniClusterResource = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			1,
-			DEFAULT_PARALLELISM));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+			.build());
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index cbe329c..c412bbf 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -18,18 +18,10 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.*;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -38,13 +30,11 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
-
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -56,15 +46,11 @@ public class MiniClusterResource extends ExternalResource {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
 
-	public static final String CODEBASE_KEY = "codebase";
-
-	public static final String NEW_CODEBASE = "new";
-
 	private final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
 
-	private final MiniClusterType miniClusterType;
+	private final TestBaseUtils.CodebaseType miniClusterType;
 
 	private JobExecutorService jobExecutorService;
 
@@ -85,30 +71,30 @@ public class MiniClusterResource extends ExternalResource {
 	}
 
 	public MiniClusterResource(
-			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-			final MiniClusterType miniClusterType) {
+		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+		final TestBaseUtils.CodebaseType miniClusterType) {
 		this(miniClusterResourceConfiguration, miniClusterType, false);
 	}
 
 	public MiniClusterResource(
-			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-			final boolean enableClusterClient) {
+		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+		final boolean enableClusterClient) {
 		this(
 			miniClusterResourceConfiguration,
-			Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? MiniClusterType.NEW : MiniClusterType.LEGACY,
+			miniClusterResourceConfiguration.getCodebaseType(),
 			enableClusterClient);
 	}
 
 	private MiniClusterResource(
-			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-			final MiniClusterType miniClusterType,
-			final boolean enableClusterClient) {
+		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+		final TestBaseUtils.CodebaseType miniClusterType,
+		final boolean enableClusterClient) {
 		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
 		this.miniClusterType = Preconditions.checkNotNull(miniClusterType);
 		this.enableClusterClient = enableClusterClient;
 	}
 
-	public MiniClusterType getMiniClusterType() {
+	public TestBaseUtils.CodebaseType getMiniClusterType() {
 		return miniClusterType;
 	}
 
@@ -187,7 +173,7 @@ public class MiniClusterResource extends ExternalResource {
 		}
 	}
 
-	private void startJobExecutorService(MiniClusterType miniClusterType) throws Exception {
+	private void startJobExecutorService(TestBaseUtils.CodebaseType miniClusterType) throws Exception {
 		switch (miniClusterType) {
 			case LEGACY:
 				startLegacyMiniCluster();
@@ -196,7 +182,7 @@ public class MiniClusterResource extends ExternalResource {
 				startMiniCluster();
 				break;
 			default:
-				throw new FlinkRuntimeException("Unknown MiniClusterType "  + miniClusterType + '.');
+				throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.');
 		}
 	}
 
@@ -264,67 +250,4 @@ public class MiniClusterResource extends ExternalResource {
 
 		webUIPort = miniCluster.getRestAddress().getPort();
 	}
-
-	/**
-	 * Mini cluster resource configuration object.
-	 */
-	public static class MiniClusterResourceConfiguration {
-		private final Configuration configuration;
-
-		private final int numberTaskManagers;
-
-		private final int numberSlotsPerTaskManager;
-
-		private final Time shutdownTimeout;
-
-		public MiniClusterResourceConfiguration(
-				Configuration configuration,
-				int numberTaskManagers,
-				int numberSlotsPerTaskManager) {
-			this(
-				configuration,
-				numberTaskManagers,
-				numberSlotsPerTaskManager,
-				AkkaUtils.getTimeoutAsTime(configuration));
-		}
-
-		public MiniClusterResourceConfiguration(
-				Configuration configuration,
-				int numberTaskManagers,
-				int numberSlotsPerTaskManager,
-				Time shutdownTimeout) {
-			this.configuration = Preconditions.checkNotNull(configuration);
-			this.numberTaskManagers = numberTaskManagers;
-			this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
-			this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
-		}
-
-		public Configuration getConfiguration() {
-			return configuration;
-		}
-
-		public int getNumberTaskManagers() {
-			return numberTaskManagers;
-		}
-
-		public int getNumberSlotsPerTaskManager() {
-			return numberSlotsPerTaskManager;
-		}
-
-		public Time getShutdownTimeout() {
-			return shutdownTimeout;
-		}
-	}
-
-	// ---------------------------------------------
-	// Enum definitions
-	// ---------------------------------------------
-
-	/**
-	 * Type of the mini cluster to start.
-	 */
-	public enum MiniClusterType {
-		LEGACY,
-		NEW
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
new file mode 100644
index 0000000..c938920
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Mini cluster resource configuration object.
+ */
+public class MiniClusterResourceConfiguration {
+
+	private final Configuration configuration;
+
+	private final int numberTaskManagers;
+
+	private final int numberSlotsPerTaskManager;
+
+	private final Time shutdownTimeout;
+
+	private final TestBaseUtils.CodebaseType codebaseType;
+
+	private final RpcServiceSharing rpcServiceSharing;
+
+	MiniClusterResourceConfiguration(
+		Configuration configuration,
+		int numberTaskManagers,
+		int numberSlotsPerTaskManager,
+		Time shutdownTimeout,
+		TestBaseUtils.CodebaseType codebaseType,
+		RpcServiceSharing rpcServiceSharing) {
+		this.configuration = Preconditions.checkNotNull(configuration);
+		this.numberTaskManagers = numberTaskManagers;
+		this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+		this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
+		this.codebaseType = Preconditions.checkNotNull(codebaseType);
+		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
+	}
+
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	public int getNumberTaskManagers() {
+		return numberTaskManagers;
+	}
+
+	public int getNumberSlotsPerTaskManager() {
+		return numberSlotsPerTaskManager;
+	}
+
+	public Time getShutdownTimeout() {
+		return shutdownTimeout;
+	}
+
+	/**
+	 * @deprecated Will be irrelevant once the legacy mode has been removed.
+	 */
+	@Deprecated
+	public TestBaseUtils.CodebaseType getCodebaseType() {
+		return codebaseType;
+	}
+
+	public RpcServiceSharing getRpcServiceSharing() {
+		return rpcServiceSharing;
+	}
+
+	/**
+	 * Builder for {@link MiniClusterResourceConfiguration}.
+	 */
+	public static final class Builder {
+
+		private Configuration configuration = new Configuration();
+		private int numberTaskManagers = 1;
+		private int numberSlotsPerTaskManager = 1;
+		private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration);
+		private TestBaseUtils.CodebaseType codebaseType = TestBaseUtils.getCodebaseType();
+
+		private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
+
+		public Builder setConfiguration(Configuration configuration) {
+			this.configuration = configuration;
+			return this;
+		}
+
+		public Builder setNumberTaskManagers(int numberTaskManagers) {
+			this.numberTaskManagers = numberTaskManagers;
+			return this;
+		}
+
+		public Builder setNumberSlotsPerTaskManager(int numberSlotsPerTaskManager) {
+			this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
+			return this;
+		}
+
+		public Builder setShutdownTimeout(Time shutdownTimeout) {
+			this.shutdownTimeout = shutdownTimeout;
+			return this;
+		}
+
+		/**
+		 * @deprecated Will be irrelevant once the legacy mode has been removed.
+		 */
+		@Deprecated
+		public Builder setCodebaseType(TestBaseUtils.CodebaseType codebaseType) {
+			this.codebaseType = codebaseType;
+			return this;
+		}
+
+		public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
+			this.rpcServiceSharing = rpcServiceSharing;
+			return this;
+		}
+
+		public MiniClusterResourceConfiguration build() {
+			return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 7e9b12e..10fe1f0 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -44,6 +44,8 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
@@ -103,6 +105,9 @@ public class TestBaseUtils extends TestLogger {
 
 	public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L);
 
+	static final String NEW_CODEBASE = "new";
+	static final String CODEBASE_KEY = "codebase";
+
 	// ------------------------------------------------------------------------
 
 	protected static File logDir;
@@ -673,6 +678,26 @@ public class TestBaseUtils extends TestLogger {
 		throw new TimeoutException("Could not get HTTP response in time since the service is still unavailable.");
 	}
 
+	@Nonnull
+	public static CodebaseType getCodebaseType() {
+		return Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY;
+	}
+
+	public static boolean isNewCodebase() {
+		return CodebaseType.NEW == getCodebaseType();
+	}
+
+	/**
+	 * Type of the mini cluster to start.
+	 *
+	 * @deprecated Will be irrelevant once the legacy mode has been removed.
+	 */
+	@Deprecated
+	public enum CodebaseType {
+		LEGACY,
+		NEW
+	}
+
 	/**
 	 * Comparator for comparable Tuples.
 	 * @param <T> tuple type

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 3d90833..d186e36 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -50,10 +51,10 @@ public class AccumulatorErrorITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			3));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(3)
+			.build());
 
 	public static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 347c2e3..5944d3a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -84,10 +85,11 @@ public class AccumulatorLiveITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			1,
-			1),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(1)
+			.build(),
 		true);
 
 	private static Configuration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index cac16f0..871df7d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -56,10 +57,11 @@ public abstract class CancelingTestBase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			4),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(4)
+			.build(),
 		true);
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 61baefa..a1e31e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -106,10 +107,11 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	protected final MiniClusterResource getMiniClusterResource() {
 		return new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				getConfigurationSafe(),
-				2,
-				PARALLELISM / 2));
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfigurationSafe())
+				.setNumberTaskManagers(2)
+				.setNumberSlotsPerTaskManager(PARALLELISM / 2)
+				.build());
 	}
 
 	private Configuration getConfigurationSafe() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 07167a9..b82abb9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -66,10 +67,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			PARALLELISM / 2));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(PARALLELISM / 2)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index e5868c3..caa1791 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
@@ -79,10 +80,11 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			NUM_TASK_MANAGERS,
-			NUM_TASK_SLOTS));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(NUM_TASK_MANAGERS)
+			.setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 55631a2..e7fcefd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -52,6 +52,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -127,10 +128,11 @@ public class RescalingITCase extends TestLogger {
 			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 			cluster = new MiniClusterResource(
-				new MiniClusterResource.MiniClusterResourceConfiguration(
-					config,
-					numTaskManagers,
-					slotsPerTaskManager),
+				new MiniClusterResourceConfiguration.Builder()
+					.setConfiguration(config)
+					.setNumberTaskManagers(numTaskManagers)
+					.setNumberSlotsPerTaskManager(numSlots)
+					.build(),
 				true);
 			cluster.before();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index aebaa63..f4fe8e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
@@ -263,10 +264,11 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 		}
 
 		MiniClusterResource cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				NUM_TASK_MANAGERS,
-				SLOTS_PER_TASK_MANAGER),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(NUM_TASK_MANAGERS)
+				.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+				.build(),
 			true);
 
 		cluster.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2805144..b46c485 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -227,11 +228,11 @@ public class SavepointITCase extends TestLogger {
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		MiniClusterResource cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				numTaskManagers,
-				numSlotsPerTaskManager
-			),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(numTaskManagers)
+				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+				.build(),
 			true);
 		cluster.before();
 		ClusterClient<?> client = cluster.getClusterClient();
@@ -296,11 +297,11 @@ public class SavepointITCase extends TestLogger {
 
 		// Start Flink
 		MiniClusterResource cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				numTaskManagers,
-				numSlotsPerTaskManager
-			),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(numTaskManagers)
+				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+				.build(),
 			true);
 
 		LOG.info("Shutting down Flink cluster.");
@@ -341,11 +342,11 @@ public class SavepointITCase extends TestLogger {
 		// create a new TestingCluster to make sure we start with completely
 		// new resources
 		cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				numTaskManagers,
-				numSlotsPerTaskManager
-			),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(numTaskManagers)
+				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+				.build(),
 			true);
 		LOG.info("Restarting Flink cluster.");
 		cluster.before();
@@ -578,11 +579,11 @@ public class SavepointITCase extends TestLogger {
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		MiniClusterResource cluster = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				1,
-				2 * jobGraph.getMaximumParallelism()
-			),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism())
+				.build(),
 			true);
 		cluster.before();
 		ClusterClient<?> client = cluster.getClusterClient();
@@ -708,11 +709,11 @@ public class SavepointITCase extends TestLogger {
 
 		MiniClusterResource get() {
 			return new MiniClusterResource(
-				new MiniClusterResource.MiniClusterResourceConfiguration(
-					config,
-					numTaskManagers,
-					numSlotsPerTaskManager
-				),
+				new MiniClusterResourceConfiguration.Builder()
+					.setConfiguration(config)
+					.setNumberTaskManagers(numTaskManagers)
+					.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+					.build(),
 				true);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 5ac1214..0ed9d6b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -19,9 +19,9 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -42,10 +42,10 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			NUM_TASK_MANAGERS,
-			NUM_TASK_SLOTS));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(NUM_TASK_MANAGERS)
+			.setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+			.build());
 
 	/**
 	 * Implementations are expected to assemble the test topology in this function

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index bc73717..1b93684 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -74,10 +75,11 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			PARALLELISM / 2));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(PARALLELISM / 2)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 156d448..6fb18e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -109,10 +110,11 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 		// we have to manage this manually because we have to create the ZooKeeper server
 		// ahead of this
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				config,
-				NUM_TMS,
-				NUM_SLOTS_PER_TM),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(config)
+				.setNumberTaskManagers(NUM_TMS)
+				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+				.build(),
 			true);
 
 		miniClusterResource.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 84cb88a..0d43aa9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.OptionalFailure;
 
@@ -88,10 +89,11 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 	protected SavepointMigrationTestBase() throws Exception {
 		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResource.MiniClusterResourceConfiguration(
-				getConfiguration(),
-				1,
-				DEFAULT_PARALLELISM),
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfiguration())
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+				.build(),
 			true);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index d8ed2ee..93bec74 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -53,12 +55,11 @@ public class JobRetrievalITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			1,
-			4
-		),
-		MiniClusterResource.MiniClusterType.NEW
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(4)
+			.build(),
+		TestBaseUtils.CodebaseType.NEW
 	);
 
 	private RestClusterClient<StandaloneClusterId> client;

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index ecd16a1..5ca9ba2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -51,10 +52,11 @@ public class JobSubmissionFailsITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			NUM_TM,
-			NUM_SLOTS / NUM_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(NUM_TM)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS / NUM_TM)
+			.build(),
 		true);
 
 	private static Configuration getConfiguration() {


[8/9] flink git commit: [FLINK-9493] Forward cause when releasing a TaskManager at the SlotPool

Posted by tr...@apache.org.
[FLINK-9493] Forward cause when releasing a TaskManager at the SlotPool

This closes #6202.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53728132
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53728132
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53728132

Branch: refs/heads/master
Commit: 537281323adcbcf71cddfd824c56677e66c718ff
Parents: 029b9c0
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Thu Jun 21 19:43:42 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/jobmaster/JobMaster.java    |  2 +-
 .../flink/runtime/jobmaster/slotpool/SlotPool.java   | 15 ++++++++-------
 .../runtime/jobmaster/slotpool/SlotPoolGateway.java  |  3 ++-
 .../jobmanager/scheduler/SchedulerTestBase.java      |  2 +-
 .../runtime/jobmaster/slotpool/SlotPoolTest.java     |  4 ++--
 5 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f96e0ae..aaa2f0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -648,7 +648,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		log.debug("Disconnect TaskExecutor {} because: {}", resourceID, cause.getMessage());
 
 		taskManagerHeartbeatManager.unmonitorTarget(resourceID);
-		CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID);
+		CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID, cause);
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index b090cf8..6ab21c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -87,7 +87,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * <p>All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
  * eliminate ambiguities.
  *
- * TODO : Make pending requests location preference aware
+ * <p>TODO : Make pending requests location preference aware
  * TODO : Make pass location preferences to ResourceManager when sending a slot request
  */
 public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
@@ -219,7 +219,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 		// release all registered slots by releasing the corresponding TaskExecutors
 		for (ResourceID taskManagerResourceId : registeredTaskManagers) {
-			releaseTaskManagerInternal(taskManagerResourceId);
+			final FlinkException cause = new FlinkException(
+				"Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");
+			releaseTaskManagerInternal(taskManagerResourceId, cause);
 		}
 
 		clear();
@@ -1043,11 +1045,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
 	 *
 	 * @param resourceId The id of the TaskManager
+	 * @param cause for the releasing of the TaskManager
 	 */
 	@Override
-	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId) {
+	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId, final Exception cause) {
 		if (registeredTaskManagers.remove(resourceId)) {
-			releaseTaskManagerInternal(resourceId);
+			releaseTaskManagerInternal(resourceId, cause);
 		}
 
 		return CompletableFuture.completedFuture(Acknowledge.get());
@@ -1063,9 +1066,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		removePendingRequest(slotRequestId);
 	}
 
-	private void releaseTaskManagerInternal(final ResourceID resourceId) {
-		final FlinkException cause = new FlinkException("Releasing TaskManager " + resourceId + '.');
-
+	private void releaseTaskManagerInternal(final ResourceID resourceId, final Exception cause) {
 		final Set<AllocatedSlot> removedSlots = new HashSet<>(allocatedSlots.removeSlotsForTaskManager(resourceId));
 
 		for (AllocatedSlot allocatedSlot : removedSlots) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 1aad92a..34d9c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -86,9 +86,10 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
 	 * Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
 	 *
 	 * @param resourceId identifying the TaskExecutor which shall be released from the SlotPool
+	 * @param cause for the releasing of the TaskManager
 	 * @return Future acknowledge which is completed after the TaskExecutor has been released
 	 */
-	CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId);
+	CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId, final Exception cause);
 
 	/**
 	 * Offers a slot to the {@link SlotPool}. The slot offer can be accepted or

http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 3d54412..940934f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -296,7 +296,7 @@ public class SchedulerTestBase extends TestLogger {
 		@Override
 		public void releaseTaskManager(ResourceID resourceId) {
 			try {
-				slotPool.releaseTaskManager(resourceId).get();
+				slotPool.releaseTaskManager(resourceId, null).get();
 			} catch (Exception e) {
 				throw new RuntimeException("Should not have happened.", e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/53728132/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 6f88e40..91ccc81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -378,7 +378,7 @@ public class SlotPoolTest extends TestLogger {
 
 			slot1.tryAssignPayload(dummyPayload);
 
-			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
+			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null);
 
 			releaseFuture.get();
 			assertFalse(slot1.isAlive());
@@ -718,7 +718,7 @@ public class SlotPoolTest extends TestLogger {
 				timeout);
 
 			// release the TaskExecutor before we get a response from the slot releasing
-			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()).get();
+			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(), null).get();
 
 			// let the slot releasing fail --> since the owning TaskExecutor is no longer registered
 			// the slot should be discarded


[4/9] flink git commit: [hotfix] Only run MiniClusterResource if specified CodebaseType equals actual CodebaseType

Posted by tr...@apache.org.
[hotfix] Only run MiniClusterResource if specified CodebaseType equals actual CodebaseType


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e64d4dbd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e64d4dbd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e64d4dbd

Branch: refs/heads/master
Commit: e64d4dbd3e8b02b7070118b007cbfd0433da7b54
Parents: 8bb086d
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 15:08:18 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/WebFrontendITCase.java   |  7 ++-
 .../flink/test/util/MiniClusterResource.java    | 46 +++++++++++++-------
 .../flink/test/misc/AutoParallelismITCase.java  |  2 +-
 3 files changed, 35 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e64d4dbd/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 776a267..c20a44e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -56,7 +56,6 @@ import java.nio.file.Files;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -155,7 +154,7 @@ public class WebFrontendITCase extends TestLogger {
 		if (notFoundJobConnection.getResponseCode() >= 400) {
 			// we don't set the content-encoding header
 			Assert.assertNull(notFoundJobConnection.getContentEncoding());
-			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
+			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
 				Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
 			} else {
 				Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
@@ -283,7 +282,7 @@ public class WebFrontendITCase extends TestLogger {
 		final Deadline deadline = testTimeout.fromNow();
 
 		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
-			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
+			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
 				// stop the job
 				client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
 				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
@@ -358,7 +357,7 @@ public class WebFrontendITCase extends TestLogger {
 			HttpTestClient.SimpleHttpResponse response = client
 				.getNextResponse(deadline.timeLeft());
 
-			if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
+			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
 				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 			} else {
 				assertEquals(HttpResponseStatus.OK, response.getStatus());

http://git-wip-us.apache.org/repos/asf/flink/blob/e64d4dbd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index bbb945f..150d956 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -21,7 +21,13 @@ package org.apache.flink.test.util;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.*;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -30,6 +36,8 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+
+import org.junit.Assume;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -38,6 +46,9 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+
 /**
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
@@ -50,7 +61,7 @@ public class MiniClusterResource extends ExternalResource {
 
 	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
 
-	private final TestBaseUtils.CodebaseType miniClusterType;
+	private final TestBaseUtils.CodebaseType codebaseType;
 
 	private JobExecutorService jobExecutorService;
 
@@ -74,12 +85,12 @@ public class MiniClusterResource extends ExternalResource {
 			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
 			final boolean enableClusterClient) {
 		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
-		this.miniClusterType = miniClusterResourceConfiguration.getCodebaseType();
+		this.codebaseType = miniClusterResourceConfiguration.getCodebaseType();
 		this.enableClusterClient = enableClusterClient;
 	}
 
-	public TestBaseUtils.CodebaseType getMiniClusterType() {
-		return miniClusterType;
+	public TestBaseUtils.CodebaseType getCodebaseType() {
+		return codebaseType;
 	}
 
 	public int getNumberSlots() {
@@ -110,9 +121,12 @@ public class MiniClusterResource extends ExternalResource {
 
 	@Override
 	public void before() throws Exception {
+		// verify that we are running in the correct test profile
+		Assume.assumeThat(TestBaseUtils.getCodebaseType(), is(equalTo(codebaseType)));
+
 		temporaryFolder.create();
 
-		startJobExecutorService(miniClusterType);
+		startJobExecutorService(codebaseType);
 
 		numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
 
@@ -140,17 +154,19 @@ public class MiniClusterResource extends ExternalResource {
 
 		clusterClient = null;
 
-		final CompletableFuture<?> terminationFuture = jobExecutorService.closeAsync();
+		if (jobExecutorService != null) {
+			final CompletableFuture<?> terminationFuture = jobExecutorService.closeAsync();
 
-		try {
-			terminationFuture.get(
-				miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(),
-				TimeUnit.MILLISECONDS);
-		} catch (Exception e) {
-			exception = ExceptionUtils.firstOrSuppressed(e, exception);
-		}
+			try {
+				terminationFuture.get(
+					miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(),
+					TimeUnit.MILLISECONDS);
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
 
-		jobExecutorService = null;
+			jobExecutorService = null;
+		}
 
 		if (exception != null) {
 			LOG.warn("Could not properly shut down the MiniClusterResource.", exception);

http://git-wip-us.apache.org/repos/asf/flink/blob/e64d4dbd/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 189962c..4826a46 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -80,7 +80,7 @@ public class AutoParallelismITCase extends TestLogger {
 			assertEquals(PARALLELISM, resultCollection.size());
 		}
 		catch (Exception ex) {
-			if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(TestBaseUtils.CodebaseType.LEGACY)) {
+			if (MINI_CLUSTER_RESOURCE.getCodebaseType() == TestBaseUtils.CodebaseType.LEGACY) {
 				throw ex;
 			}
 			assertTrue(


[7/9] flink git commit: [hotfix] Make ActorSystemLoader in ClusterClient configurable

Posted by tr...@apache.org.
[hotfix] Make ActorSystemLoader in ClusterClient configurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b946c5e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b946c5e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b946c5e5

Branch: refs/heads/master
Commit: b946c5e569eeabf90666bbda9c458bca71029a7f
Parents: e64d4db
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 15:24:40 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 48 +++++++++++++++++---
 .../client/program/StandaloneClusterClient.java |  4 ++
 .../apache/flink/yarn/YarnClusterClient.java    |  4 +-
 3 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c082b10..a157d34 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -104,8 +104,8 @@ public abstract class ClusterClient<T> {
 	/** The optimizer used in the optimization of batch programs. */
 	final Optimizer compiler;
 
-	/** The actor system used to communicate with the JobManager. Lazily initialized upon first use */
-	protected final LazyActorSystemLoader actorSystemLoader;
+	/** The actor system used to communicate with the JobManager. */
+	protected final ActorSystemLoader actorSystemLoader;
 
 	/** Configuration of the client. */
 	protected final Configuration flinkConfig;
@@ -171,7 +171,10 @@ public abstract class ClusterClient<T> {
 	 * @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
 	 * @param sharedHaServices true if the HighAvailabilityServices are shared and must not be shut down
 	 */
-	public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices) {
+	public ClusterClient(
+			Configuration flinkConfig,
+			HighAvailabilityServices highAvailabilityServices,
+			boolean sharedHaServices) {
 		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
 
@@ -188,14 +191,45 @@ public abstract class ClusterClient<T> {
 		this.sharedHaServices = sharedHaServices;
 	}
 
+	public ClusterClient(
+			Configuration flinkConfig,
+			HighAvailabilityServices highAvailabilityServices,
+			boolean sharedHaServices,
+			ActorSystemLoader actorSystemLoader) {
+		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
+
+		this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
+		this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
+
+		this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader);
+
+		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+		this.sharedHaServices = sharedHaServices;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Startup & Shutdown
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Interface which allows to load an {@link ActorSystem}.
+	 */
+	public interface ActorSystemLoader extends AutoCloseable {
+
+		/**
+		 * Get an {@link ActorSystem}.
+		 *
+		 * @return {@link ActorSystem}
+		 * @throws FlinkException
+		 */
+		ActorSystem get() throws FlinkException;
+	}
+
+	/**
 	 * Utility class to lazily instantiate an {@link ActorSystem}.
 	 */
-	protected static class LazyActorSystemLoader {
+	protected static class LazyActorSystemLoader implements ActorSystemLoader {
 
 		private final Logger log;
 
@@ -226,7 +260,8 @@ public abstract class ClusterClient<T> {
 			return actorSystem != null;
 		}
 
-		public void shutdown() {
+		@Override
+		public void close() throws Exception {
 			if (isLoaded()) {
 				actorSystem.shutdown();
 				actorSystem.awaitTermination();
@@ -239,6 +274,7 @@ public abstract class ClusterClient<T> {
 		 * @return ActorSystem
 		 * @throws Exception if the ActorSystem could not be created
 		 */
+		@Override
 		public ActorSystem get() throws FlinkException {
 
 			if (!isLoaded()) {
@@ -276,7 +312,7 @@ public abstract class ClusterClient<T> {
 	 */
 	public void shutdown() throws Exception {
 		synchronized (this) {
-			actorSystemLoader.shutdown();
+			actorSystemLoader.close();
 
 			if (!sharedHaServices && highAvailabilityServices != null) {
 				highAvailabilityServices.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index e502add..caee34f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -51,6 +51,10 @@ public class StandaloneClusterClient extends ClusterClient<StandaloneClusterId>
 		super(config, highAvailabilityServices, sharedHaServices);
 	}
 
+	public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices, ActorSystemLoader actorSystemLoader) {
+		super(config, highAvailabilityServices, sharedHaServices, actorSystemLoader);
+	}
+
 	@Override
 	public void waitForClusterToBeReady() {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 0d7546e..a5aca5d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -285,14 +285,14 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
 	private static class LazApplicationClientLoader {
 
 		private final Configuration flinkConfig;
-		private final LazyActorSystemLoader actorSystemLoader;
+		private final ActorSystemLoader actorSystemLoader;
 		private final HighAvailabilityServices highAvailabilityServices;
 
 		private ActorRef applicationClient;
 
 		private LazApplicationClientLoader(
 				Configuration flinkConfig,
-				LazyActorSystemLoader actorSystemLoader,
+				ActorSystemLoader actorSystemLoader,
 				HighAvailabilityServices highAvailabilityServices) {
 			this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig");
 			this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader, "actorSystemLoader");


[2/9] flink git commit: [hotfix] Start always ClusterClient for MiniClusterResource

Posted by tr...@apache.org.
[hotfix] Start always ClusterClient for MiniClusterResource


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/029b9c06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/029b9c06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/029b9c06

Branch: refs/heads/master
Commit: 029b9c060ca5b3af03846a06c51d26913564b1f4
Parents: ccfe9bb
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 15:15:51 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTestBase.java         |  3 +-
 .../gateway/local/LocalExecutorITCase.java      |  3 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |  3 +-
 .../HAQueryableStateFsBackendITCase.java        |  3 +-
 .../HAQueryableStateRocksDBBackendITCase.java   |  3 +-
 .../NonHAQueryableStateFsBackendITCase.java     |  3 +-
 ...NonHAQueryableStateRocksDBBackendITCase.java |  3 +-
 .../runtime/webmonitor/WebFrontendITCase.java   |  4 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  7 +++
 .../flink/test/util/MiniClusterResource.java    | 46 +++++++++++---------
 .../accumulators/AccumulatorLiveITCase.java     |  3 +-
 .../test/cancelling/CancelingTestBase.java      |  3 +-
 .../test/checkpointing/RescalingITCase.java     |  3 +-
 .../ResumeCheckpointManuallyITCase.java         |  3 +-
 .../test/checkpointing/SavepointITCase.java     | 15 +++----
 .../ZooKeeperHighAvailabilityITCase.java        |  3 +-
 .../utils/SavepointMigrationTestBase.java       |  3 +-
 .../failing/JobSubmissionFailsITCase.java       |  3 +-
 .../flink/test/runtime/NettyEpollITCase.java    |  3 +-
 .../runtime/NetworkStackThroughputITCase.java   |  4 +-
 .../AbstractOperatorRestoreTestBase.java        |  3 +-
 .../test/streaming/runtime/TimestampITCase.java |  3 +-
 22 files changed, 56 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 806c90c..ae77b6f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -86,8 +86,7 @@ public abstract class KafkaTestBase extends TestLogger {
 			.setConfiguration(getFlinkConfiguration())
 			.setNumberTaskManagers(NUM_TMS)
 			.setNumberSlotsPerTaskManager(TM_SLOTS)
-			.build(),
-		true);
+			.build());
 
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index ad2d509..c4f282b 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -76,8 +76,7 @@ public class LocalExecutorITCase extends TestLogger {
 			.setConfiguration(getConfig())
 			.setNumberTaskManagers(NUM_TMS)
 			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-			.build(),
-		true);
+			.build());
 
 	private static ClusterClient<?> clusterClient;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index b7afca9..179cf9c 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -67,8 +67,7 @@ public class JMXJobManagerMetricTest extends TestLogger {
 			.setConfiguration(getConfiguration())
 			.setNumberSlotsPerTaskManager(1)
 			.setNumberTaskManagers(1)
-			.build(),
-		true);
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration flinkConfiguration = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index f977a8e..63feeaf 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -73,8 +73,7 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
 				.setConfiguration(getConfig())
 				.setNumberTaskManagers(NUM_TMS)
 				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-				.build(),
-			true);
+				.build());
 
 		miniClusterResource.before();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 25f56d7..702e712 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -73,8 +73,7 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
 				.setConfiguration(getConfig())
 				.setNumberTaskManagers(NUM_TMS)
 				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-				.build(),
-			true);
+				.build());
 
 		miniClusterResource.before();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index 0d61e0d..b166002 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -57,8 +57,7 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe
 			.setConfiguration(getConfig())
 			.setNumberTaskManagers(NUM_TMS)
 			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-			.build(),
-		true);
+			.build());
 
 	@Override
 	protected AbstractStateBackend createStateBackend() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 002ff9f..dedf8b1 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -57,8 +57,7 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt
 			.setConfiguration(getConfig())
 			.setNumberTaskManagers(NUM_TMS)
 			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-			.build(),
-		true);
+			.build());
 
 	@Override
 	protected AbstractStateBackend createStateBackend() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index c20a44e..380ab96 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -84,9 +84,7 @@ public class WebFrontendITCase extends TestLogger {
 			.setConfiguration(CLUSTER_CONFIGURATION)
 			.setNumberTaskManagers(NUM_TASK_MANAGERS)
 			.setNumberSlotsPerTaskManager(NUM_SLOTS)
-			.build(),
-		true
-	);
+			.build());
 
 	private static Configuration getClusterConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 6c9ee5b..8d9e1ee 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -448,6 +448,13 @@ abstract class FlinkMiniCluster(
       ioExecutor)
   }
 
+  def firstActorSystem(): Option[ActorSystem] = {
+    jobManagerActorSystems match {
+      case Some(jmActorSystems) => Some(jmActorSystems.head)
+      case None => None
+    }
+  }
+
   protected def startInternalShutdown(): Unit = {
     webMonitor foreach {
       _.stop()

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 150d956..94842a6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DefaultActorSystemLoader;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
@@ -32,11 +33,13 @@ import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
+import akka.actor.ActorSystem;
 import org.junit.Assume;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
@@ -46,6 +49,8 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 
@@ -65,8 +70,6 @@ public class MiniClusterResource extends ExternalResource {
 
 	private JobExecutorService jobExecutorService;
 
-	private final boolean enableClusterClient;
-
 	private ClusterClient<?> clusterClient;
 
 	private Configuration restClusterClientConfig;
@@ -78,15 +81,8 @@ public class MiniClusterResource extends ExternalResource {
 	private int webUIPort = -1;
 
 	public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
-		this(miniClusterResourceConfiguration, false);
-	}
-
-	public MiniClusterResource(
-			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-			final boolean enableClusterClient) {
 		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
 		this.codebaseType = miniClusterResourceConfiguration.getCodebaseType();
-		this.enableClusterClient = enableClusterClient;
 	}
 
 	public TestBaseUtils.CodebaseType getCodebaseType() {
@@ -98,12 +94,6 @@ public class MiniClusterResource extends ExternalResource {
 	}
 
 	public ClusterClient<?> getClusterClient() {
-		if (!enableClusterClient) {
-			// this check is technically only necessary for legacy clusters
-			// we still fail here to keep the behaviors in sync
-			throw new IllegalStateException("To use the client you must enable it with the constructor.");
-		}
-
 		return clusterClient;
 	}
 
@@ -194,12 +184,27 @@ public class MiniClusterResource extends ExternalResource {
 
 		final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster(
 			configuration,
-			!enableClusterClient); // the cluster client only works if separate actor systems are used
+			miniClusterResourceConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED);
 
 		jobExecutorService = flinkMiniCluster;
-		if (enableClusterClient) {
-			clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true);
+
+		switch (miniClusterResourceConfiguration.getRpcServiceSharing()) {
+			case SHARED:
+				Option<ActorSystem> actorSystemOption = flinkMiniCluster.firstActorSystem();
+				Preconditions.checkState(actorSystemOption.isDefined());
+
+				final ActorSystem actorSystem = actorSystemOption.get();
+				clusterClient = new StandaloneClusterClient(
+					configuration,
+					flinkMiniCluster.highAvailabilityServices(),
+					true,
+					new DefaultActorSystemLoader(actorSystem));
+				break;
+			case DEDICATED:
+				clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true);
+				break;
 		}
+
 		Configuration restClientConfig = new Configuration();
 		restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort());
 		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
@@ -240,9 +245,8 @@ public class MiniClusterResource extends ExternalResource {
 		configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 
 		jobExecutorService = miniCluster;
-		if (enableClusterClient) {
-			clusterClient = new MiniClusterClient(configuration, miniCluster);
-		}
+		clusterClient = new MiniClusterClient(configuration, miniCluster);
+
 		Configuration restClientConfig = new Configuration();
 		restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost());
 		restClientConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 5944d3a..2c12e44 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -89,8 +89,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(1)
 			.setNumberSlotsPerTaskManager(1)
-			.build(),
-		true);
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 871df7d..5f7ca86 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -61,8 +61,7 @@ public abstract class CancelingTestBase extends TestLogger {
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(2)
 			.setNumberSlotsPerTaskManager(4)
-			.build(),
-		true);
+			.build());
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index e7fcefd..3a3eada 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -132,8 +132,7 @@ public class RescalingITCase extends TestLogger {
 					.setConfiguration(config)
 					.setNumberTaskManagers(numTaskManagers)
 					.setNumberSlotsPerTaskManager(numSlots)
-					.build(),
-				true);
+					.build());
 			cluster.before();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index f4fe8e7..24f408d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -268,8 +268,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 				.setConfiguration(config)
 				.setNumberTaskManagers(NUM_TASK_MANAGERS)
 				.setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
-				.build(),
-			true);
+				.build());
 
 		cluster.before();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index b46c485..b66c745 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -232,8 +232,7 @@ public class SavepointITCase extends TestLogger {
 				.setConfiguration(config)
 				.setNumberTaskManagers(numTaskManagers)
 				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
-				.build(),
-			true);
+				.build());
 		cluster.before();
 		ClusterClient<?> client = cluster.getClusterClient();
 
@@ -301,8 +300,7 @@ public class SavepointITCase extends TestLogger {
 				.setConfiguration(config)
 				.setNumberTaskManagers(numTaskManagers)
 				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
-				.build(),
-			true);
+				.build());
 
 		LOG.info("Shutting down Flink cluster.");
 		cluster.before();
@@ -346,8 +344,7 @@ public class SavepointITCase extends TestLogger {
 				.setConfiguration(config)
 				.setNumberTaskManagers(numTaskManagers)
 				.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
-				.build(),
-			true);
+				.build());
 		LOG.info("Restarting Flink cluster.");
 		cluster.before();
 		client = cluster.getClusterClient();
@@ -583,8 +580,7 @@ public class SavepointITCase extends TestLogger {
 				.setConfiguration(config)
 				.setNumberTaskManagers(1)
 				.setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism())
-				.build(),
-			true);
+				.build());
 		cluster.before();
 		ClusterClient<?> client = cluster.getClusterClient();
 
@@ -713,8 +709,7 @@ public class SavepointITCase extends TestLogger {
 					.setConfiguration(config)
 					.setNumberTaskManagers(numTaskManagers)
 					.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
-					.build(),
-				true);
+					.build());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 6fb18e8..e02ed01 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -114,8 +114,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger {
 				.setConfiguration(config)
 				.setNumberTaskManagers(NUM_TMS)
 				.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-				.build(),
-			true);
+				.build());
 
 		miniClusterResource.before();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 0d43aa9..a5267b5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -93,8 +93,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 				.setConfiguration(getConfiguration())
 				.setNumberTaskManagers(1)
 				.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
-				.build(),
-			true);
+				.build());
 	}
 
 	private Configuration getConfiguration() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 5ca9ba2..5738eb7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -56,8 +56,7 @@ public class JobSubmissionFailsITCase extends TestLogger {
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(NUM_TM)
 			.setNumberSlotsPerTaskManager(NUM_SLOTS / NUM_TM)
-			.build(),
-		true);
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
index 5559935..418e947 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
@@ -79,8 +79,7 @@ public class NettyEpollITCase extends TestLogger {
 					.setConfiguration(config)
 					.setNumberTaskManagers(NUM_TASK_MANAGERS)
 					.setNumberSlotsPerTaskManager(1)
-					.build(),
-				true);
+					.build());
 			cluster.before();
 			return cluster;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index c2748a5..96cb397 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -238,9 +238,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 				new MiniClusterResourceConfiguration.Builder()
 					.setNumberTaskManagers(numTaskManagers)
 					.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
-					.build(),
-				true
-			);
+					.build());
 			cluster.before();
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 60027c9..7eebde8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -75,8 +75,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(NUM_TMS)
 			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-			.build(),
-		true);
+			.build());
 
 	private final boolean allowNonRestoredState;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/029b9c06/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 21051c5..5d165bf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -83,8 +83,7 @@ public class TimestampITCase extends TestLogger {
 			.setConfiguration(getConfiguration())
 			.setNumberTaskManagers(NUM_TASK_MANAGERS)
 			.setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
-			.build(),
-		true);
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();


[9/9] flink git commit: [hotfix] Add DefaultActorSystemLoader which returns the given ActorSystem

Posted by tr...@apache.org.
[hotfix] Add DefaultActorSystemLoader which returns the given ActorSystem


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccfe9bb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccfe9bb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccfe9bb8

Branch: refs/heads/master
Commit: ccfe9bb87e67e9130686c4f9f36b76d75e061605
Parents: b946c5e
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 15:29:08 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../flink/client/program/ActorSystemLoader.java | 37 +++++++++++++++
 .../flink/client/program/ClusterClient.java     | 14 ------
 .../program/DefaultActorSystemLoader.java       | 47 ++++++++++++++++++++
 .../apache/flink/yarn/YarnClusterClient.java    |  1 +
 4 files changed, 85 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ccfe9bb8/flink-clients/src/main/java/org/apache/flink/client/program/ActorSystemLoader.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ActorSystemLoader.java b/flink-clients/src/main/java/org/apache/flink/client/program/ActorSystemLoader.java
new file mode 100644
index 0000000..eadfc42
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ActorSystemLoader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.ActorSystem;
+
+/**
+ * Interface which allows to load an {@link ActorSystem}.
+ */
+public interface ActorSystemLoader extends AutoCloseable {
+
+	/**
+	 * Get an {@link ActorSystem}.
+	 *
+	 * @return {@link ActorSystem}
+	 * @throws FlinkException
+	 */
+	ActorSystem get() throws FlinkException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ccfe9bb8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index a157d34..373267f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -213,20 +213,6 @@ public abstract class ClusterClient<T> {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Interface which allows to load an {@link ActorSystem}.
-	 */
-	public interface ActorSystemLoader extends AutoCloseable {
-
-		/**
-		 * Get an {@link ActorSystem}.
-		 *
-		 * @return {@link ActorSystem}
-		 * @throws FlinkException
-		 */
-		ActorSystem get() throws FlinkException;
-	}
-
-	/**
 	 * Utility class to lazily instantiate an {@link ActorSystem}.
 	 */
 	protected static class LazyActorSystemLoader implements ActorSystemLoader {

http://git-wip-us.apache.org/repos/asf/flink/blob/ccfe9bb8/flink-clients/src/main/java/org/apache/flink/client/program/DefaultActorSystemLoader.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultActorSystemLoader.java b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultActorSystemLoader.java
new file mode 100644
index 0000000..1f80e9e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultActorSystemLoader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import akka.actor.ActorSystem;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link ActorSystemLoader} which simply returns the given {@link ActorSystem} at
+ * construction time.
+ */
+public class DefaultActorSystemLoader implements ActorSystemLoader {
+
+	@Nonnull
+	private final ActorSystem actorSystem;
+
+	public DefaultActorSystemLoader(@Nonnull ActorSystem actorSystem) {
+		this.actorSystem = actorSystem;
+	}
+
+	@Override
+	public ActorSystem get() {
+		return actorSystem;
+	}
+
+	@Override
+	public void close() throws Exception {
+		// noop
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ccfe9bb8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index a5aca5d..c3b1444 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.ActorSystemLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;


[5/9] flink git commit: [hotfix] Introduce builder for MiniClusterResourceConfiguration

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index a5b01bc..5aabd31 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import static org.junit.Assert.fail;
 
@@ -55,10 +56,11 @@ public class StreamingScalabilityAndLatency {
 			config.setInteger("taskmanager.net.client.numThreads", 1);
 
 			cluster = new MiniClusterResource(
-				new MiniClusterResource.MiniClusterResourceConfiguration(
-					config,
-					taskManagers,
-					slotsPerTaskManager));
+				new MiniClusterResourceConfiguration.Builder()
+					.setConfiguration(config)
+					.setNumberTaskManagers(taskManagers)
+					.setNumberSlotsPerTaskManager(slotsPerTaskManager)
+					.build());
 			cluster.before();
 
 			runPartitioningProgram(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 074b721..189962c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterResource.MiniClusterType;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -56,10 +55,10 @@ public class AutoParallelismITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResourceConfiguration(
-			new Configuration(),
-			NUM_TM,
-			SLOTS_PER_TM));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(NUM_TM)
+			.setNumberSlotsPerTaskManager(SLOTS_PER_TM)
+			.build());
 
 	@Test
 	public void testProgramWithAutoParallelism() throws Exception {
@@ -81,7 +80,7 @@ public class AutoParallelismITCase extends TestLogger {
 			assertEquals(PARALLELISM, resultCollection.size());
 		}
 		catch (Exception ex) {
-			if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(MiniClusterType.LEGACY)) {
+			if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(TestBaseUtils.CodebaseType.LEGACY)) {
 				throw ex;
 			}
 			assertTrue(

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index ab937c9..ddc7dd8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.TestLogger;
 
@@ -50,10 +51,11 @@ public class CustomSerializationITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			1,
-			PARLLELISM));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(PARLLELISM)
+			.build());
 
 	public static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 6099934..6b3371d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -55,10 +56,10 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			2,
-			3));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(3)
+			.build());
 
 	@Test
 	public void testNullValues() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index e149f43..c905071 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
@@ -52,10 +53,11 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			8));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(8)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
index 74b8cf7..a20a0d0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
@@ -27,11 +27,11 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.test.operators.util.CollectionDataSets;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -50,10 +50,10 @@ public class CustomDistributionITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			1,
-			8));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(8)
+			.build());
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index 424f540..d3247f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.recovery;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.junit.ClassRule;
 
@@ -31,10 +32,10 @@ public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCas
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			2));
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(2)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 1f52d27..0ccb3fe 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.recovery;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 
 import org.junit.ClassRule;
 
@@ -31,10 +32,11 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecover
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			2));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(2)
+			.build());
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 4a7b345..5272581 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
@@ -60,10 +61,11 @@ public class IPv6HostnamesITCase extends TestLogger {
 
 	@Rule
 	public final MiniClusterResource miniClusterResource = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			2,
-			2));
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(2)
+			.setNumberSlotsPerTaskManager(2)
+			.build());
 
 	private Configuration getConfiguration() {
 		final Inet6Address ipv6address = getLocalIPv6Address();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
index e8914ec..5559935 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AssumptionViolatedException;
@@ -75,10 +75,11 @@ public class NettyEpollITCase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setString(TRANSPORT_TYPE, "epoll");
 			MiniClusterResource cluster = new MiniClusterResource(
-				new MiniClusterResourceConfiguration(
-					config,
-					NUM_TASK_MANAGERS,
-					1),
+				new MiniClusterResourceConfiguration.Builder()
+					.setConfiguration(config)
+					.setNumberTaskManagers(NUM_TASK_MANAGERS)
+					.setNumberSlotsPerTaskManager(1)
+					.build(),
 				true);
 			cluster.before();
 			return cluster;

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index b5c233a..c2748a5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -235,11 +235,10 @@ public class NetworkStackThroughputITCase extends TestLogger {
 			final int numTaskManagers = parallelism / numSlotsPerTaskManager;
 
 			final MiniClusterResource cluster = new MiniClusterResource(
-				new MiniClusterResource.MiniClusterResourceConfiguration(
-					new Configuration(),
-					numTaskManagers,
-					numSlotsPerTaskManager
-				),
+				new MiniClusterResourceConfiguration.Builder()
+					.setNumberTaskManagers(numTaskManagers)
+					.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
+					.build(),
 				true
 			);
 			cluster.before();

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index b5c2aaf..60027c9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -35,6 +34,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -50,7 +50,6 @@ import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -73,10 +72,10 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			new Configuration(),
-			NUM_TMS,
-			NUM_SLOTS_PER_TM),
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(NUM_TMS)
+			.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+			.build(),
 		true);
 
 	private final boolean allowNonRestoredState;

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 3b46c82..21051c5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -78,10 +79,11 @@ public class TimestampITCase extends TestLogger {
 
 	@ClassRule
 	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
-		new MiniClusterResource.MiniClusterResourceConfiguration(
-			getConfiguration(),
-			NUM_TASK_MANAGERS,
-			NUM_TASK_SLOTS),
+		new MiniClusterResourceConfiguration.Builder()
+			.setConfiguration(getConfiguration())
+			.setNumberTaskManagers(NUM_TASK_MANAGERS)
+			.setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+			.build(),
 		true);
 
 	private static Configuration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6de6ed48/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index bfb8c3d..514a3d5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -80,9 +80,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
 
-import static org.apache.flink.test.util.MiniClusterResource.CODEBASE_KEY;
-import static org.apache.flink.test.util.MiniClusterResource.NEW_CODEBASE;
-
 /**
  * This base class allows to use the MiniYARNCluster.
  * The cluster is re-used for all tests.
@@ -220,7 +217,8 @@ public abstract class YarnTestBase extends TestLogger {
 		}
 
 		flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
-		isNewMode = Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY));
+
+		isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
 	}
 
 	@Nullable
@@ -536,7 +534,7 @@ public abstract class YarnTestBase extends TestLogger {
 			FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
 
 			globalConfiguration.setString(CoreOptions.MODE,
-				Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE);
+				Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE);
 
 			BootstrapTools.writeConfiguration(
 				globalConfiguration,


[3/9] flink git commit: [hotfix] Pass CodebaseType via the MiniClusterResourceConfiguration

Posted by tr...@apache.org.
[hotfix] Pass CodebaseType via the MiniClusterResourceConfiguration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8bb086d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8bb086d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8bb086d4

Branch: refs/heads/master
Commit: 8bb086d4309352cbf111e6031cd65d7f90dba6bc
Parents: 6de6ed4
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 15:02:04 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../jobmanager/JMXJobManagerMetricTest.java     |  8 +++----
 .../webmonitor/handlers/JarRunHandlerTest.java  |  5 ++---
 .../webmonitor/history/HistoryServerTest.java   |  5 ++---
 .../flink/test/util/MiniClusterResource.java    | 22 +++-----------------
 .../test/example/client/JobRetrievalITCase.java |  5 ++---
 5 files changed, 13 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8bb086d4/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 90ba6d4..b7afca9 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -64,10 +64,10 @@ public class JMXJobManagerMetricTest extends TestLogger {
 	@ClassRule
 	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
 		new MiniClusterResourceConfiguration.Builder()
-		.setConfiguration(getConfiguration())
-		.setNumberSlotsPerTaskManager(1)
-		.setNumberTaskManagers(1)
-		.build(),
+			.setConfiguration(getConfiguration())
+			.setNumberSlotsPerTaskManager(1)
+			.setNumberTaskManagers(1)
+			.build(),
 		true);
 
 	private static Configuration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8bb086d4/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index d26620b..3e8e633 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -68,9 +68,8 @@ public class JarRunHandlerTest {
 				.setConfiguration(config)
 				.setNumberTaskManagers(1)
 				.setNumberSlotsPerTaskManager(1)
-				.build(),
-			TestBaseUtils.CodebaseType.NEW
-		);
+				.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
+				.build());
 		clusterResource.before();
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/8bb086d4/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 5d29fbb..4451e6d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -81,9 +81,8 @@ public class HistoryServerTest extends TestLogger {
 				.setConfiguration(clusterConfig)
 				.setNumberTaskManagers(1)
 				.setNumberSlotsPerTaskManager(1)
-				.build(),
-			TestBaseUtils.CodebaseType.NEW
-		);
+				.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
+				.build());
 		cluster.before();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8bb086d4/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index c412bbf..bbb945f 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -71,26 +71,10 @@ public class MiniClusterResource extends ExternalResource {
 	}
 
 	public MiniClusterResource(
-		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-		final TestBaseUtils.CodebaseType miniClusterType) {
-		this(miniClusterResourceConfiguration, miniClusterType, false);
-	}
-
-	public MiniClusterResource(
-		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-		final boolean enableClusterClient) {
-		this(
-			miniClusterResourceConfiguration,
-			miniClusterResourceConfiguration.getCodebaseType(),
-			enableClusterClient);
-	}
-
-	private MiniClusterResource(
-		final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
-		final TestBaseUtils.CodebaseType miniClusterType,
-		final boolean enableClusterClient) {
+			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+			final boolean enableClusterClient) {
 		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
-		this.miniClusterType = Preconditions.checkNotNull(miniClusterType);
+		this.miniClusterType = miniClusterResourceConfiguration.getCodebaseType();
 		this.enableClusterClient = enableClusterClient;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8bb086d4/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 93bec74..66a894f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -58,9 +58,8 @@ public class JobRetrievalITCase extends TestLogger {
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(1)
 			.setNumberSlotsPerTaskManager(4)
-			.build(),
-		TestBaseUtils.CodebaseType.NEW
-	);
+			.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
+			.build());
 
 	private RestClusterClient<StandaloneClusterId> client;