You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/28 22:16:06 UTC

[flink] branch master updated (3138734 -> 9856592)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3138734  fix typo in JobMaster (#7572)
     new 9fb3025  [FLINK-11414] Introduce JobMasterService interface
     new 9856592  [hotfix] Remove RPC timeouts from JobMasterService lifecycle methods

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/jobmaster/JobManagerRunner.java  | 20 +++----
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 32 +++++++----
 .../flink/runtime/jobmaster/JobMasterService.java  | 62 ++++++++++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java     | 36 ++++++-------
 4 files changed, 110 insertions(+), 40 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java


[flink] 01/02: [FLINK-11414] Introduce JobMasterService interface

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fb3025e12a38136fa583110f8f267091c3db19a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 22 16:49:21 2019 +0100

    [FLINK-11414] Introduce JobMasterService interface
    
    For a better separation of concerns in the JobManagerRunner, this commit introduces
    a JobMasterService which only exposes the JobMaster's lifecycle methods to the
    JobManagerRunner. This allows for an easier substitution when testing the JobManagerRunner.
    
    This closes #7563.
---
 .../flink/runtime/jobmaster/JobManagerRunner.java  | 15 +++--
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 17 +++++-
 .../flink/runtime/jobmaster/JobMasterService.java  | 65 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index c10a535..e5e545f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -77,7 +77,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	private final JobManagerSharedServices jobManagerSharedServices;
 
-	private final JobMaster jobMaster;
+	private final JobMasterService jobMasterService;
 
 	private final FatalErrorHandler fatalErrorHandler;
 
@@ -152,7 +152,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				rpcService);
 
 			// now start the JobManager
-			this.jobMaster = new JobMaster(
+			this.jobMasterService = new JobMaster(
 				rpcService,
 				jobMasterConfiguration,
 				resourceId,
@@ -212,8 +212,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				setNewLeaderGatewayFuture();
 				leaderGatewayFuture.completeExceptionally(new FlinkException("JobMaster has been shut down."));
 
-				jobMaster.shutDown();
-				final CompletableFuture<Void> jobManagerTerminationFuture = jobMaster.getTerminationFuture();
+				final CompletableFuture<Void> jobManagerTerminationFuture = jobMasterService.closeAsync();
 
 				jobManagerTerminationFuture.whenComplete(
 					(Void ignored, Throwable throwable) -> {
@@ -328,7 +327,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
 
-			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);
+			final CompletableFuture<Acknowledge> startFuture = jobMasterService.start(new JobMasterId(leaderSessionId), rpcTimeout);
 			final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
 
 			startFuture.whenCompleteAsync(
@@ -345,7 +344,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {
 		if (leaderElectionService.hasLeadership(leaderSessionId)) {
-			currentLeaderGatewayFuture.complete(jobMaster.getSelfGateway(JobMasterGateway.class));
+			currentLeaderGatewayFuture.complete(jobMasterService.getGateway());
 			leaderElectionService.confirmLeaderSessionID(leaderSessionId);
 		} else {
 			log.debug("Ignoring confirmation of leader session id because {} is no longer the leader.", getAddress());
@@ -365,7 +364,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 			setNewLeaderGatewayFuture();
 
-			CompletableFuture<Acknowledge>  suspendFuture = jobMaster.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
+			CompletableFuture<Acknowledge>  suspendFuture = jobMasterService.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
 
 			suspendFuture.whenCompleteAsync(
 				(Acknowledge ack, Throwable throwable) -> {
@@ -396,7 +395,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	@Override
 	public String getAddress() {
-		return jobMaster.getAddress();
+		return jobMasterService.getAddress();
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c63e5f5..3679d86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -145,7 +145,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * given task</li>
  * </ul>
  */
-public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
+public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
 
 	/** Default names for Flink's distributed components. */
 	public static final String JOB_MANAGER_NAME = "jobmanager";
@@ -1500,6 +1500,21 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	//----------------------------------------------------------------------------------------------
+	// Service methods
+	//----------------------------------------------------------------------------------------------
+
+	@Override
+	public JobMasterGateway getGateway() {
+		return getSelfGateway(JobMasterGateway.class);
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		shutDown();
+		return getTerminationFuture();
+	}
+
+	//----------------------------------------------------------------------------------------------
 	// Utility classes
 	//----------------------------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
new file mode 100644
index 0000000..09a5870
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AutoCloseableAsync;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface which specifies the JobMaster service.
+ */
+public interface JobMasterService extends AutoCloseableAsync {
+
+	/**
+	 * Start the JobMaster service with the given {@link JobMasterId}.
+	 *
+	 * @param jobMasterId to start the service with
+	 * @param rpcTimeout timeout of this operation
+	 * @return Future which is completed once the JobMaster service has been started
+	 * @throws Exception if the JobMaster service could not be started
+	 */
+	CompletableFuture<Acknowledge> start(JobMasterId jobMasterId, Time rpcTimeout) throws Exception;
+
+	/**
+	 * Suspend the JobMaster service. This means that the service will stop to react
+	 * to messages.
+	 *
+	 * @param cause for the suspension
+	 * @param rpcTimeout timeout of this operation
+	 * @return Future which is completed once the JobMaster service has been suspended
+	 */
+	CompletableFuture<Acknowledge> suspend(Exception cause, Time rpcTimeout);
+
+	/**
+	 * Get the {@link JobMasterGateway} belonging to this service.
+	 *
+	 * @return JobMasterGateway belonging to this service
+	 */
+	JobMasterGateway getGateway();
+
+	/**
+	 * Get the address of the JobMaster service under which it is reachable.
+	 *
+	 * @return Address of the JobMaster service
+	 */
+	String getAddress();
+}


[flink] 02/02: [hotfix] Remove RPC timeouts from JobMasterService lifecycle methods

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 985659283014efffc673d5501646a3d69f124936
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 22 16:54:14 2019 +0100

    [hotfix] Remove RPC timeouts from JobMasterService lifecycle methods
    
    The lifecycle methods should not have a RPC timeout. Instead the implementations
    now use the RpcUtils#INF_TIMEOUT as the timeout.
---
 .../flink/runtime/jobmaster/JobManagerRunner.java  |  9 ++----
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 15 +++++----
 .../flink/runtime/jobmaster/JobMasterService.java  |  7 ++---
 .../flink/runtime/jobmaster/JobMasterTest.java     | 36 +++++++++++-----------
 4 files changed, 29 insertions(+), 38 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index e5e545f..1ac2f80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -81,8 +80,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	private final FatalErrorHandler fatalErrorHandler;
 
-	private final Time rpcTimeout;
-
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
 
 	private final CompletableFuture<Void> terminationFuture;
@@ -143,8 +140,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 			final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
 
-			this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
-
 			this.leaderGatewayFuture = new CompletableFuture<>();
 
 			final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
@@ -327,7 +322,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
 
-			final CompletableFuture<Acknowledge> startFuture = jobMasterService.start(new JobMasterId(leaderSessionId), rpcTimeout);
+			final CompletableFuture<Acknowledge> startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
 			final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
 
 			startFuture.whenCompleteAsync(
@@ -364,7 +359,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 			setNewLeaderGatewayFuture();
 
-			CompletableFuture<Acknowledge>  suspendFuture = jobMasterService.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
+			CompletableFuture<Acknowledge>  suspendFuture = jobMasterService.suspend(new FlinkException("JobManager is no longer the leader."));
 
 			suspendFuture.whenCompleteAsync(
 				(Acknowledge ack, Throwable throwable) -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3679d86..62446b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -95,6 +95,7 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPre
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
@@ -311,14 +312,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 * Start the rpc service and begin to run the job.
 	 *
 	 * @param newJobMasterId The necessary fencing token to run the job
-	 * @param timeout for the operation
 	 * @return Future acknowledge if the job could be started. Otherwise the future contains an exception
 	 */
-	public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId, final Time timeout) throws Exception {
+	public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
 		// make sure we receive RPC and async calls
 		super.start();
 
-		return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), timeout);
+		return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
 	}
 
 	/**
@@ -326,16 +326,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 * will be disposed.
 	 *
 	 * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
-	 * calling the {@link #start(JobMasterId, Time)} method once we take the leadership back again.
+	 * calling the {@link #start(JobMasterId)} method once we take the leadership back again.
 	 *
 	 * <p>This method is executed asynchronously
 	 *
 	 * @param cause The reason of why this job been suspended.
-	 * @param timeout for this operation
 	 * @return Future acknowledge indicating that the job has been suspended. Otherwise the future contains an exception
 	 */
-	public CompletableFuture<Acknowledge> suspend(final Exception cause, final Time timeout) {
-		CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(() -> suspendExecution(cause), timeout);
+	public CompletableFuture<Acknowledge> suspend(final Exception cause) {
+		CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(() -> suspendExecution(cause), RpcUtils.INF_TIMEOUT);
 
 		stop();
 
@@ -1058,7 +1057,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 * will be disposed.
 	 *
 	 * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
-	 * calling the {@link #start(JobMasterId, Time)} method once we take the leadership back again.
+	 * calling the {@link #start(JobMasterId)} method once we take the leadership back again.
 	 *
 	 * @param cause The reason of why this job been suspended.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
index 09a5870..463287e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.AutoCloseableAsync;
 
@@ -33,21 +32,19 @@ public interface JobMasterService extends AutoCloseableAsync {
 	 * Start the JobMaster service with the given {@link JobMasterId}.
 	 *
 	 * @param jobMasterId to start the service with
-	 * @param rpcTimeout timeout of this operation
 	 * @return Future which is completed once the JobMaster service has been started
 	 * @throws Exception if the JobMaster service could not be started
 	 */
-	CompletableFuture<Acknowledge> start(JobMasterId jobMasterId, Time rpcTimeout) throws Exception;
+	CompletableFuture<Acknowledge> start(JobMasterId jobMasterId) throws Exception;
 
 	/**
 	 * Suspend the JobMaster service. This means that the service will stop to react
 	 * to messages.
 	 *
 	 * @param cause for the suspension
-	 * @param rpcTimeout timeout of this operation
 	 * @return Future which is completed once the JobMaster service has been suspended
 	 */
-	CompletableFuture<Acknowledge> suspend(Exception cause, Time rpcTimeout);
+	CompletableFuture<Acknowledge> suspend(Exception cause);
 
 	/**
 	 * Get the {@link JobMasterGateway} belonging to this service.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 9c51eab..df18c00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -282,7 +282,7 @@ public class JobMasterTest extends TestLogger {
 				}
 			};
 
-			jobMaster.start(jobMasterId, testingTimeout).get();
+			jobMaster.start(jobMasterId).get();
 
 			final String className = "UserException";
 			final URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava(
@@ -334,7 +334,7 @@ public class JobMasterTest extends TestLogger {
 			haServices,
 			jobManagerSharedServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 
 		try {
 			// wait for the start to complete
@@ -401,7 +401,7 @@ public class JobMasterTest extends TestLogger {
 			haServices,
 			jobManagerSharedServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 
 		try {
 			// wait for the start operation to complete
@@ -640,7 +640,7 @@ public class JobMasterTest extends TestLogger {
 
 		try {
 			final long start = System.nanoTime();
-			jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+			jobMaster.start(JobMasterId.generate()).get();
 
 			final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
 			final ArrayBlockingQueue<SlotRequest> blockingQueue = new ArrayBlockingQueue<>(2);
@@ -706,7 +706,7 @@ public class JobMasterTest extends TestLogger {
 			new TestingJobManagerSharedServicesBuilder().build());
 
 		try {
-			jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+			jobMaster.start(JobMasterId.generate()).get();
 			final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 			final String firstResourceManagerAddress = "address1";
 			final String secondResourceManagerAddress = "address2";
@@ -755,7 +755,7 @@ public class JobMasterTest extends TestLogger {
 
 		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 
 		try {
 			// wait for the start to complete
@@ -799,7 +799,7 @@ public class JobMasterTest extends TestLogger {
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build());
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 
 		try {
 			// wait for the start to complete
@@ -820,11 +820,11 @@ public class JobMasterTest extends TestLogger {
 
 			assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
 
-			jobMaster.suspend(new FlinkException("Test exception."), testingTimeout).get();
+			jobMaster.suspend(new FlinkException("Test exception.")).get();
 
 			final JobMasterId jobMasterId2 = JobMasterId.generate();
 
-			jobMaster.start(jobMasterId2, testingTimeout).get();
+			jobMaster.start(jobMasterId2).get();
 
 			final JobMasterId secondRegistrationAttempt = registrationQueue.take();
 
@@ -866,7 +866,7 @@ public class JobMasterTest extends TestLogger {
 			haServices,
 			jobManagerSharedServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 
 		try {
 			// wait for the start to complete
@@ -991,7 +991,7 @@ public class JobMasterTest extends TestLogger {
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
 
 		try {
@@ -1021,7 +1021,7 @@ public class JobMasterTest extends TestLogger {
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
 
 		try {
@@ -1066,7 +1066,7 @@ public class JobMasterTest extends TestLogger {
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
 
 		try {
@@ -1104,7 +1104,7 @@ public class JobMasterTest extends TestLogger {
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
 
 		try {
@@ -1167,7 +1167,7 @@ public class JobMasterTest extends TestLogger {
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
 
 		try {
@@ -1222,7 +1222,7 @@ public class JobMasterTest extends TestLogger {
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices);
 
-		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 
 		try {
 			// wait for the start to complete
@@ -1342,7 +1342,7 @@ public class JobMasterTest extends TestLogger {
 		};
 
 		try {
-			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
 			startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
@@ -1402,7 +1402,7 @@ public class JobMasterTest extends TestLogger {
 		rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway);
 
 		try {
-			jobMaster.start(jobMasterId, testingTimeout).get();
+			jobMaster.start(jobMasterId).get();
 
 			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);