You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 12:24:24 UTC

[3/3] flink git commit: [hotfix] Add JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering

[hotfix] Add JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering

The JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering verifies that the JM
will retry a job scheduling if one of its TMs does not properly offer a slot. The
mechanism which triggers this behaviour is the slot request timeout which fails the
ongoing scheduling operation if the slot requests are not fulfilled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17f0e850
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17f0e850
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17f0e850

Branch: refs/heads/master
Commit: 17f0e850fd26da0c50195d3d9daa423ead1fbe3e
Parents: d5de2bc
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 23:57:01 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 14:24:02 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMasterTest.java  | 152 +++++++++++++++++--
 .../resourcemanager/ResourceManagerTest.java    |   4 +-
 .../slotmanager/SlotManagerTest.java            |   3 +-
 .../TestingTaskExecutorGateway.java             |  38 ++---
 .../TestingTaskExecutorGatewayBuilder.java      |  75 +++++++++
 5 files changed, 230 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index c0c9162..d8f33fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -36,12 +39,15 @@ import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -51,14 +57,18 @@ import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.TestLogger;
@@ -78,11 +88,16 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 
 /**
  * Tests for {@link JobMaster}.
@@ -95,9 +110,11 @@ public class JobMasterTest extends TestLogger {
 
 	private static final Time testingTimeout = Time.seconds(10L);
 
-	private static final long heartbeatInterval = 1L;
+	private static final long fastHeartbeatInterval = 1L;
+	private static final long fastHeartbeatTimeout = 5L;
 
-	private static final long heartbeatTimeout = 5L;
+	private static final long heartbeatInterval = 1000L;
+	private static final long heartbeatTimeout = 5000L;
 
 	private static final JobGraph jobGraph = new JobGraph();
 
@@ -105,6 +122,8 @@ public class JobMasterTest extends TestLogger {
 
 	private static HeartbeatServices fastHeartbeatServices;
 
+	private static HeartbeatServices heartbeatServices;
+
 	private BlobServer blobServer;
 
 	private Configuration configuration;
@@ -123,7 +142,8 @@ public class JobMasterTest extends TestLogger {
 	public static void setupClass() {
 		rpcService = new TestingRpcService();
 
-		fastHeartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, rpcService.getScheduledExecutor());
+		fastHeartbeatServices = new TestingHeartbeatServices(fastHeartbeatInterval, fastHeartbeatTimeout, rpcService.getScheduledExecutor());
+		heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, rpcService.getScheduledExecutor());
 	}
 
 	@Before
@@ -157,6 +177,8 @@ public class JobMasterTest extends TestLogger {
 		if (blobServer != null) {
 			blobServer.close();
 		}
+
+		rpcService.clearGateways();
 	}
 
 	@AfterClass
@@ -169,14 +191,13 @@ public class JobMasterTest extends TestLogger {
 
 	@Test
 	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
-		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
-
 		final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>();
 		final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
-
-		taskExecutorGateway.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete);
-		taskExecutorGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete)
+			.setDisconnectJobManagerConsumer((jobId, throwable) -> disconnectedJobManagerFuture.complete(jobId))
+			.createTestingTaskExecutorGateway();
 
 		rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
@@ -224,7 +245,7 @@ public class JobMasterTest extends TestLogger {
 		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(
 			resourceManagerId,
 			rmResourceId,
-			heartbeatInterval,
+			fastHeartbeatInterval,
 			"localhost",
 			"localhost");
 
@@ -306,7 +327,7 @@ public class JobMasterTest extends TestLogger {
 
 			assertThat(savepointCheckpoint, Matchers.notNullValue());
 
-			assertThat(savepointCheckpoint.getCheckpointID(), Matchers.is(savepointId));
+			assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
 		}
@@ -356,13 +377,103 @@ public class JobMasterTest extends TestLogger {
 
 			assertThat(savepointCheckpoint, Matchers.notNullValue());
 
-			assertThat(savepointCheckpoint.getCheckpointID(), Matchers.is(checkpointId));
+			assertThat(savepointCheckpoint.getCheckpointID(), is(checkpointId));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
 		}
 	}
 
 	/**
+	 * Tests that the JobMaster retries the scheduling of a job
+	 * in case of a missing slot offering from a registered TaskExecutor
+	 */
+	@Test
+	public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
+		final JobGraph restartingJobGraph = createSingleVertexJobWithRestartStrategy();
+
+		final long slotRequestTimeout = 10L;
+		configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout);
+
+		final JobMaster jobMaster = createJobMaster(
+			JobMasterConfiguration.fromConfiguration(configuration),
+			restartingJobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder().build(),
+			heartbeatServices);
+
+		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+		try {
+			final long start = System.nanoTime();
+			jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+
+			final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			final ArrayBlockingQueue<SlotRequest> blockingQueue = new ArrayBlockingQueue<>(2);
+			resourceManagerGateway.setRequestSlotConsumer(blockingQueue::offer);
+
+			rpcService.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
+			rmLeaderRetrievalService.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
+
+			// wait for the first slot request
+			blockingQueue.take();
+
+			final CompletableFuture<TaskDeploymentDescriptor> submittedTaskFuture = new CompletableFuture<>();
+			final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+				.setSubmitTaskConsumer((tdd, ignored) -> {
+					submittedTaskFuture.complete(tdd);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				})
+				.createTestingTaskExecutorGateway();
+
+			rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+
+			jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get();
+
+			// wait for the slot request timeout
+			final SlotRequest slotRequest = blockingQueue.take();
+			final long end = System.nanoTime();
+
+			// we rely on the slot request timeout to fail a stuck scheduling operation
+			assertThat((end-start) / 1_000_000L, Matchers.greaterThanOrEqualTo(slotRequestTimeout));
+
+			assertThat(submittedTaskFuture.isDone(), is(false));
+
+			final SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, ResourceProfile.UNKNOWN);
+
+			final CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
+
+			final Collection<SlotOffer> acceptedSlots = acceptedSlotsFuture.get();
+
+			assertThat(acceptedSlots, hasSize(1));
+			final SlotOffer acceptedSlot = acceptedSlots.iterator().next();
+
+			assertThat(acceptedSlot.getAllocationId(), equalTo(slotRequest.getAllocationId()));
+
+			// wait for the deployed task
+			final TaskDeploymentDescriptor taskDeploymentDescriptor = submittedTaskFuture.get();
+
+			assertThat(taskDeploymentDescriptor.getAllocationId(), equalTo(slotRequest.getAllocationId()));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
+	private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
+		final JobVertex jobVertex = new JobVertex("Test vertex");
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+		final JobGraph jobGraph = new JobGraph(jobVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+		jobGraph.setExecutionConfig(executionConfig);
+
+		return jobGraph;
+	}
+
+	/**
 	 * Tests that we can close an unestablished ResourceManager connection.
 	 */
 	@Test
@@ -450,6 +561,21 @@ public class JobMasterTest extends TestLogger {
 			JobGraph jobGraph,
 			HighAvailabilityServices highAvailabilityServices,
 			JobManagerSharedServices jobManagerSharedServices) throws Exception {
+		return createJobMaster(
+			jobMasterConfiguration,
+			jobGraph,
+			highAvailabilityServices,
+			jobManagerSharedServices,
+			fastHeartbeatServices);
+	}
+
+	@Nonnull
+	private JobMaster createJobMaster(
+		JobMasterConfiguration jobMasterConfiguration,
+		JobGraph jobGraph,
+		HighAvailabilityServices highAvailabilityServices,
+		JobManagerSharedServices jobManagerSharedServices,
+		HeartbeatServices heartbeatServices) throws Exception {
 		return new JobMaster(
 			rpcService,
 			jobMasterConfiguration,
@@ -457,7 +583,7 @@ public class JobMasterTest extends TestLogger {
 			jobGraph,
 			highAvailabilityServices,
 			jobManagerSharedServices,
-			fastHeartbeatServices,
+			heartbeatServices,
 			blobServer,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			new NoOpOnCompletionActions(),

http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 7dab685..ebcb295 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.New;
@@ -105,7 +105,7 @@ public class ResourceManagerTest extends TestLogger {
 		try {
 			final ResourceID taskManagerId = ResourceID.generate();
 			final ResourceManagerGateway resourceManagerGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
 			// first make the ResourceManager the leader
 			resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();

http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 59de473..7f6736c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.testutils.category.New;
@@ -1091,7 +1092,7 @@ public class SlotManagerTest extends TestLogger {
 	public void testReportAllocatedSlot() throws Exception {
 		final ResourceID taskManagerId = ResourceID.generate();
 		final ResourceActions resourceActions = mock(ResourceActions.class);
-		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
 
 		try (final SlotManager slotManager = new SlotManager(

http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 43b0be2..1c48307 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -40,6 +39,8 @@ import org.apache.flink.util.Preconditions;
 import org.junit.experimental.categories.Category;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 
 /**
@@ -52,25 +53,18 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	private final String hostname;
 
-	private volatile Consumer<ResourceID> heartbeatJobManagerConsumer;
+	private final Consumer<ResourceID> heartbeatJobManagerConsumer;
 
-	private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+	private final BiConsumer<JobID, Throwable> disconnectJobManagerConsumer;
 
-	public TestingTaskExecutorGateway() {
-		this("foobar:1234", "foobar");
-	}
+	private final BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer;
 
-	public TestingTaskExecutorGateway(String address, String hostname) {
+	public TestingTaskExecutorGateway(String address, String hostname, Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer) {
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
-	}
-
-	public void setHeartbeatJobManagerConsumer(Consumer<ResourceID> heartbeatJobManagerConsumer) {
-		this.heartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
-	}
-
-	public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer) {
-		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+		this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer);
+		this.disconnectJobManagerConsumer = Preconditions.checkNotNull(disconnectJobManagerConsumer);
+		this.submitTaskConsumer = Preconditions.checkNotNull(submitTaskConsumer);
 	}
 
 	@Override
@@ -91,7 +85,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
-		return CompletableFuture.completedFuture(Acknowledge.get());
+		return submitTaskConsumer.apply(tdd, jobMasterId);
 	}
 
 	@Override
@@ -126,11 +120,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
-		final Consumer<ResourceID> currentHeartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
-
-		if (currentHeartbeatJobManagerConsumer != null) {
-			currentHeartbeatJobManagerConsumer.accept(heartbeatOrigin);
-		}
+		heartbeatJobManagerConsumer.accept(heartbeatOrigin);
 	}
 
 	@Override
@@ -140,11 +130,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public void disconnectJobManager(JobID jobId, Exception cause) {
-		final Consumer<Tuple2<JobID, Throwable>> currentDisconnectJobManagerConsumer = disconnectJobManagerConsumer;
-
-		if (currentDisconnectJobManagerConsumer != null) {
-			currentDisconnectJobManagerConsumer.accept(Tuple2.of(jobId, cause));
-		}
+		disconnectJobManagerConsumer.accept(jobId, cause);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
new file mode 100644
index 0000000..2550abf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+/**
+ * Builder for a {@link TestingTaskExecutorGateway}.
+ */
+public class TestingTaskExecutorGatewayBuilder {
+
+	private static final Consumer<ResourceID> NOOP_HEARTBEAT_JOBMANAGER_CONSUMER = ignored -> {};
+	private static final BiConsumer<JobID, Throwable> NOOP_DISCONNECT_JOBMANAGER_CONSUMER = (ignoredA, ignoredB) -> {};
+	private static final BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> NOOP_SUBMIT_TASK_CONSUMER = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
+
+	private String address = "foobar:1234";
+	private String hostname = "foobar";
+	private Consumer<ResourceID> heartbeatJobManagerConsumer = NOOP_HEARTBEAT_JOBMANAGER_CONSUMER;
+	private BiConsumer<JobID, Throwable> disconnectJobManagerConsumer = NOOP_DISCONNECT_JOBMANAGER_CONSUMER;
+	private BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer = NOOP_SUBMIT_TASK_CONSUMER;
+
+	public TestingTaskExecutorGatewayBuilder setAddress(String address) {
+		this.address = address;
+		return this;
+	}
+
+	public TestingTaskExecutorGatewayBuilder setHostname(String hostname) {
+		this.hostname = hostname;
+		return this;
+	}
+
+	public TestingTaskExecutorGatewayBuilder setHeartbeatJobManagerConsumer(Consumer<ResourceID> heartbeatJobManagerConsumer) {
+		this.heartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
+		return this;
+	}
+
+	public TestingTaskExecutorGatewayBuilder setDisconnectJobManagerConsumer(BiConsumer<JobID, Throwable> disconnectJobManagerConsumer) {
+		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+		return this;
+	}
+
+	public TestingTaskExecutorGatewayBuilder setSubmitTaskConsumer(BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer) {
+		this.submitTaskConsumer = submitTaskConsumer;
+		return this;
+	}
+
+	public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
+		return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer);
+	}
+}