You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 12:24:24 UTC
[3/3] flink git commit: [hotfix] Add
JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering
[hotfix] Add JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering
The JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering verifies that the JM
will retry a job scheduling if one of its TMs does not properly offer a slot. The
mechanism which triggers this behaviour is the slot request timeout which fails the
ongoing scheduling operation if the slot requests are not fulfilled.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17f0e850
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17f0e850
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17f0e850
Branch: refs/heads/master
Commit: 17f0e850fd26da0c50195d3d9daa423ead1fbe3e
Parents: d5de2bc
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 23:57:01 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 14:24:02 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmaster/JobMasterTest.java | 152 +++++++++++++++++--
.../resourcemanager/ResourceManagerTest.java | 4 +-
.../slotmanager/SlotManagerTest.java | 3 +-
.../TestingTaskExecutorGateway.java | 38 ++---
.../TestingTaskExecutorGatewayBuilder.java | 75 +++++++++
5 files changed, 230 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index c0c9162..d8f33fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,11 +18,14 @@
package org.apache.flink.runtime.jobmaster;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -36,12 +39,15 @@ import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -51,14 +57,18 @@ import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.category.New;
import org.apache.flink.util.TestLogger;
@@ -78,11 +88,16 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
/**
* Tests for {@link JobMaster}.
@@ -95,9 +110,11 @@ public class JobMasterTest extends TestLogger {
private static final Time testingTimeout = Time.seconds(10L);
- private static final long heartbeatInterval = 1L;
+ private static final long fastHeartbeatInterval = 1L;
+ private static final long fastHeartbeatTimeout = 5L;
- private static final long heartbeatTimeout = 5L;
+ private static final long heartbeatInterval = 1000L;
+ private static final long heartbeatTimeout = 5000L;
private static final JobGraph jobGraph = new JobGraph();
@@ -105,6 +122,8 @@ public class JobMasterTest extends TestLogger {
private static HeartbeatServices fastHeartbeatServices;
+ private static HeartbeatServices heartbeatServices;
+
private BlobServer blobServer;
private Configuration configuration;
@@ -123,7 +142,8 @@ public class JobMasterTest extends TestLogger {
public static void setupClass() {
rpcService = new TestingRpcService();
- fastHeartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, rpcService.getScheduledExecutor());
+ fastHeartbeatServices = new TestingHeartbeatServices(fastHeartbeatInterval, fastHeartbeatTimeout, rpcService.getScheduledExecutor());
+ heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, rpcService.getScheduledExecutor());
}
@Before
@@ -157,6 +177,8 @@ public class JobMasterTest extends TestLogger {
if (blobServer != null) {
blobServer.close();
}
+
+ rpcService.clearGateways();
}
@AfterClass
@@ -169,14 +191,13 @@ public class JobMasterTest extends TestLogger {
@Test
public void testHeartbeatTimeoutWithTaskManager() throws Exception {
- final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
- final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
-
final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>();
final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
-
- taskExecutorGateway.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete);
- taskExecutorGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete)
+ .setDisconnectJobManagerConsumer((jobId, throwable) -> disconnectedJobManagerFuture.complete(jobId))
+ .createTestingTaskExecutorGateway();
rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
@@ -224,7 +245,7 @@ public class JobMasterTest extends TestLogger {
final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(
resourceManagerId,
rmResourceId,
- heartbeatInterval,
+ fastHeartbeatInterval,
"localhost",
"localhost");
@@ -306,7 +327,7 @@ public class JobMasterTest extends TestLogger {
assertThat(savepointCheckpoint, Matchers.notNullValue());
- assertThat(savepointCheckpoint.getCheckpointID(), Matchers.is(savepointId));
+ assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId));
} finally {
RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
}
@@ -356,13 +377,103 @@ public class JobMasterTest extends TestLogger {
assertThat(savepointCheckpoint, Matchers.notNullValue());
- assertThat(savepointCheckpoint.getCheckpointID(), Matchers.is(checkpointId));
+ assertThat(savepointCheckpoint.getCheckpointID(), is(checkpointId));
} finally {
RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
}
}
/**
+ * Tests that the JobMaster retries the scheduling of a job
+ * in case of a missing slot offering from a registered TaskExecutor
+ */
+ @Test
+ public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
+ final JobGraph restartingJobGraph = createSingleVertexJobWithRestartStrategy();
+
+ final long slotRequestTimeout = 10L;
+ configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout);
+
+ final JobMaster jobMaster = createJobMaster(
+ JobMasterConfiguration.fromConfiguration(configuration),
+ restartingJobGraph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build(),
+ heartbeatServices);
+
+ final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+ try {
+ final long start = System.nanoTime();
+ jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ final ArrayBlockingQueue<SlotRequest> blockingQueue = new ArrayBlockingQueue<>(2);
+ resourceManagerGateway.setRequestSlotConsumer(blockingQueue::offer);
+
+ rpcService.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
+ rmLeaderRetrievalService.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
+
+ // wait for the first slot request
+ blockingQueue.take();
+
+ final CompletableFuture<TaskDeploymentDescriptor> submittedTaskFuture = new CompletableFuture<>();
+ final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setSubmitTaskConsumer((tdd, ignored) -> {
+ submittedTaskFuture.complete(tdd);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
+
+ rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+
+ jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get();
+
+ // wait for the slot request timeout
+ final SlotRequest slotRequest = blockingQueue.take();
+ final long end = System.nanoTime();
+
+ // we rely on the slot request timeout to fail a stuck scheduling operation
+ assertThat((end-start) / 1_000_000L, Matchers.greaterThanOrEqualTo(slotRequestTimeout));
+
+ assertThat(submittedTaskFuture.isDone(), is(false));
+
+ final SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, ResourceProfile.UNKNOWN);
+
+ final CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
+
+ final Collection<SlotOffer> acceptedSlots = acceptedSlotsFuture.get();
+
+ assertThat(acceptedSlots, hasSize(1));
+ final SlotOffer acceptedSlot = acceptedSlots.iterator().next();
+
+ assertThat(acceptedSlot.getAllocationId(), equalTo(slotRequest.getAllocationId()));
+
+ // wait for the deployed task
+ final TaskDeploymentDescriptor taskDeploymentDescriptor = submittedTaskFuture.get();
+
+ assertThat(taskDeploymentDescriptor.getAllocationId(), equalTo(slotRequest.getAllocationId()));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+ }
+ }
+
+ private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
+ final JobVertex jobVertex = new JobVertex("Test vertex");
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ final ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+ final JobGraph jobGraph = new JobGraph(jobVertex);
+ jobGraph.setAllowQueuedScheduling(true);
+ jobGraph.setExecutionConfig(executionConfig);
+
+ return jobGraph;
+ }
+
+ /**
* Tests that we can close an unestablished ResourceManager connection.
*/
@Test
@@ -450,6 +561,21 @@ public class JobMasterTest extends TestLogger {
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityServices,
JobManagerSharedServices jobManagerSharedServices) throws Exception {
+ return createJobMaster(
+ jobMasterConfiguration,
+ jobGraph,
+ highAvailabilityServices,
+ jobManagerSharedServices,
+ fastHeartbeatServices);
+ }
+
+ @Nonnull
+ private JobMaster createJobMaster(
+ JobMasterConfiguration jobMasterConfiguration,
+ JobGraph jobGraph,
+ HighAvailabilityServices highAvailabilityServices,
+ JobManagerSharedServices jobManagerSharedServices,
+ HeartbeatServices heartbeatServices) throws Exception {
return new JobMaster(
rpcService,
jobMasterConfiguration,
@@ -457,7 +583,7 @@ public class JobMasterTest extends TestLogger {
jobGraph,
highAvailabilityServices,
jobManagerSharedServices,
- fastHeartbeatServices,
+ heartbeatServices,
blobServer,
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
new NoOpOnCompletionActions(),
http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 7dab685..ebcb295 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.category.New;
@@ -105,7 +105,7 @@ public class ResourceManagerTest extends TestLogger {
try {
final ResourceID taskManagerId = ResourceID.generate();
final ResourceManagerGateway resourceManagerGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
- final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
// first make the ResourceManager the leader
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 59de473..7f6736c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.testutils.category.New;
@@ -1091,7 +1092,7 @@ public class SlotManagerTest extends TestLogger {
public void testReportAllocatedSlot() throws Exception {
final ResourceID taskManagerId = ResourceID.generate();
final ResourceActions resourceActions = mock(ResourceActions.class);
- final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+ final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
try (final SlotManager slotManager = new SlotManager(
http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 43b0be2..1c48307 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -40,6 +39,8 @@ import org.apache.flink.util.Preconditions;
import org.junit.experimental.categories.Category;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
/**
@@ -52,25 +53,18 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
private final String hostname;
- private volatile Consumer<ResourceID> heartbeatJobManagerConsumer;
+ private final Consumer<ResourceID> heartbeatJobManagerConsumer;
- private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+ private final BiConsumer<JobID, Throwable> disconnectJobManagerConsumer;
- public TestingTaskExecutorGateway() {
- this("foobar:1234", "foobar");
- }
+ private final BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer;
- public TestingTaskExecutorGateway(String address, String hostname) {
+ public TestingTaskExecutorGateway(String address, String hostname, Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer) {
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
- }
-
- public void setHeartbeatJobManagerConsumer(Consumer<ResourceID> heartbeatJobManagerConsumer) {
- this.heartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
- }
-
- public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer) {
- this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+ this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer);
+ this.disconnectJobManagerConsumer = Preconditions.checkNotNull(disconnectJobManagerConsumer);
+ this.submitTaskConsumer = Preconditions.checkNotNull(submitTaskConsumer);
}
@Override
@@ -91,7 +85,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
@Override
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
- return CompletableFuture.completedFuture(Acknowledge.get());
+ return submitTaskConsumer.apply(tdd, jobMasterId);
}
@Override
@@ -126,11 +120,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
@Override
public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
- final Consumer<ResourceID> currentHeartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
-
- if (currentHeartbeatJobManagerConsumer != null) {
- currentHeartbeatJobManagerConsumer.accept(heartbeatOrigin);
- }
+ heartbeatJobManagerConsumer.accept(heartbeatOrigin);
}
@Override
@@ -140,11 +130,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
@Override
public void disconnectJobManager(JobID jobId, Exception cause) {
- final Consumer<Tuple2<JobID, Throwable>> currentDisconnectJobManagerConsumer = disconnectJobManagerConsumer;
-
- if (currentDisconnectJobManagerConsumer != null) {
- currentDisconnectJobManagerConsumer.accept(Tuple2.of(jobId, cause));
- }
+ disconnectJobManagerConsumer.accept(jobId, cause);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
new file mode 100644
index 0000000..2550abf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+/**
+ * Builder for a {@link TestingTaskExecutorGateway}.
+ */
+public class TestingTaskExecutorGatewayBuilder {
+
+ private static final Consumer<ResourceID> NOOP_HEARTBEAT_JOBMANAGER_CONSUMER = ignored -> {};
+ private static final BiConsumer<JobID, Throwable> NOOP_DISCONNECT_JOBMANAGER_CONSUMER = (ignoredA, ignoredB) -> {};
+ private static final BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> NOOP_SUBMIT_TASK_CONSUMER = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
+
+ private String address = "foobar:1234";
+ private String hostname = "foobar";
+ private Consumer<ResourceID> heartbeatJobManagerConsumer = NOOP_HEARTBEAT_JOBMANAGER_CONSUMER;
+ private BiConsumer<JobID, Throwable> disconnectJobManagerConsumer = NOOP_DISCONNECT_JOBMANAGER_CONSUMER;
+ private BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer = NOOP_SUBMIT_TASK_CONSUMER;
+
+ public TestingTaskExecutorGatewayBuilder setAddress(String address) {
+ this.address = address;
+ return this;
+ }
+
+ public TestingTaskExecutorGatewayBuilder setHostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public TestingTaskExecutorGatewayBuilder setHeartbeatJobManagerConsumer(Consumer<ResourceID> heartbeatJobManagerConsumer) {
+ this.heartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
+ return this;
+ }
+
+ public TestingTaskExecutorGatewayBuilder setDisconnectJobManagerConsumer(BiConsumer<JobID, Throwable> disconnectJobManagerConsumer) {
+ this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+ return this;
+ }
+
+ public TestingTaskExecutorGatewayBuilder setSubmitTaskConsumer(BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer) {
+ this.submitTaskConsumer = submitTaskConsumer;
+ return this;
+ }
+
+ public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
+ return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer);
+ }
+}