You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 19:51:22 UTC
[flink] 02/02: [FLINK-10314] Making JobManagerRunner creation
non-blocking in Dispatcher
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 199f1758e9bdbdc316ec7168791a8e95373159a6
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 14 12:07:12 2018 +0200
[FLINK-10314] Making JobManagerRunner creation non-blocking in Dispatcher
The JobManagerRunner creation can be a blocking operation, e.g. if the CheckpointCoordinator
needs to access a FileSystem. Therefore, this operation should not be executed in the main thread
of the Dispatcher in order to not block this component.
This closes #6699.
---
.../apache/flink/util/function/FunctionUtils.java | 13 ++
.../flink/runtime/dispatcher/Dispatcher.java | 135 ++++++++++--------
.../flink/runtime/dispatcher/DispatcherTest.java | 156 +++++++++++++++++++--
.../dispatcher/TestingJobManagerRunnerFactory.java | 5 +-
.../utils/TestingResourceManagerGateway.java | 2 +-
5 files changed, 241 insertions(+), 70 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
index 678ef9f..83846a6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -32,6 +32,19 @@ public class FunctionUtils {
throw new UnsupportedOperationException("This class should never be instantiated.");
}
+ private static final Function<Object, Void> NULL_FN = ignored -> null;
+
+ /**
+ * Function which returns {@code null} (type: Void).
+ *
+ * @param <T> input type
+ * @return Function which returns {@code null}.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Function<T, Void> nullFn() {
+ return (Function<T, Void>) NULL_FN;
+ }
+
/**
* Convert at {@link FunctionWithException} into a {@link Function}.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5279e50..a1da213 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -63,10 +63,11 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.FunctionUtils;
-import org.apache.flink.util.function.ThrowingConsumer;
-import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -110,7 +111,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private final FatalErrorHandler fatalErrorHandler;
- private final Map<JobID, JobManagerRunner> jobManagerRunners;
+ private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;
private final LeaderElectionService leaderElectionService;
@@ -166,7 +167,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
- jobManagerRunners = new HashMap<>(16);
+ jobManagerRunnerFutures = new HashMap<>(16);
leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
@@ -248,7 +249,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
}
- if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
+ if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
@@ -257,58 +258,72 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
return persistAndRunFuture.exceptionally(
(Throwable throwable) -> {
+ final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+ log.error("Failed to submit job {}.", jobId, strippedThrowable);
throw new CompletionException(
- new JobSubmissionException(jobId, "Failed to submit job.", throwable));
+ new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
});
}
}
- private void persistAndRunJob(JobGraph jobGraph) throws Exception {
+ private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
- try {
- runJob(jobGraph);
- } catch (Exception e) {
- try {
+ final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
+
+ return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
+ if (throwable != null) {
submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
- } catch (Exception ie) {
- e.addSuppressed(ie);
}
-
- throw e;
- }
+ }));
}
- private void runJob(JobGraph jobGraph) throws Exception {
- Preconditions.checkState(!jobManagerRunners.containsKey(jobGraph.getJobID()));
+ private CompletableFuture<Void> runJob(JobGraph jobGraph) {
+ Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
- final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph);
+ final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
- jobManagerRunner.start();
+ jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
- jobManagerRunners.put(jobGraph.getJobID(), jobManagerRunner);
+ return jobManagerRunnerFuture
+ .thenApply(FunctionUtils.nullFn())
+ .whenCompleteAsync(
+ (ignored, throwable) -> {
+ if (throwable != null) {
+ jobManagerRunnerFutures.remove(jobGraph.getJobID());
+ }
+ },
+ getMainThreadExecutor());
}
- private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Exception {
- final JobID jobId = jobGraph.getJobID();
-
- final JobManagerRunner jobManagerRunner = jobManagerRunnerFactory.createJobManagerRunner(
- ResourceID.generate(),
- jobGraph,
- configuration,
- getRpcService(),
- highAvailabilityServices,
- heartbeatServices,
- blobServer,
- jobManagerSharedServices,
- new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
- fatalErrorHandler);
+ private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
+ final RpcService rpcService = getRpcService();
+
+ final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(() ->
+ jobManagerRunnerFactory.createJobManagerRunner(
+ ResourceID.generate(),
+ jobGraph,
+ configuration,
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ blobServer,
+ jobManagerSharedServices,
+ new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+ fatalErrorHandler)),
+ rpcService.getExecutor());
+
+ return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
+ }
+ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
+ final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
jobManagerRunner.getResultFuture().whenCompleteAsync(
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
- if (jobManagerRunner == jobManagerRunners.get(jobId)) {
+ if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
@@ -325,13 +340,15 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
}, getMainThreadExecutor());
+ jobManagerRunner.start();
+
return jobManagerRunner;
}
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(
- Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
+ Collections.unmodifiableSet(new HashSet<>(jobManagerRunnerFutures.keySet())));
}
@Override
@@ -481,9 +498,9 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
- final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+ final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
- if (jobManagerRunner == null) {
+ if (jobManagerRunnerFuture == null) {
final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);
if (archivedExecutionGraph == null) {
@@ -492,7 +509,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
}
} else {
- return jobManagerRunner.getResultFuture().thenApply(JobResult::createFrom);
+ return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
}
}
@@ -566,11 +583,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
- JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId);
final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
- if (jobManagerRunner != null) {
- jobManagerRunnerTerminationFuture = jobManagerRunner.closeAsync();
+ if (jobManagerRunnerFuture != null) {
+ jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
} else {
jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null);
}
@@ -616,7 +633,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private void terminateJobManagerRunners() {
log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
- final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunners.keySet());
+ final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunnerFutures.keySet());
for (JobID jobId : jobsToRemove) {
removeJobAndRegisterTerminationFuture(jobId, false);
@@ -739,16 +756,16 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
- final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+ final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
- if (jobManagerRunner == null) {
+ if (jobManagerRunnerFuture == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
} else {
- final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
+ final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
return leaderGatewayFuture.thenApplyAsync(
(JobMasterGateway jobMasterGateway) -> {
// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
- if (jobManagerRunners.containsKey(jobId)) {
+ if (jobManagerRunnerFutures.containsKey(jobId)) {
return jobMasterGateway;
} else {
throw new CompletionException(new FlinkJobNotFoundException(jobId));
@@ -764,12 +781,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Nonnull
private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) {
- final int numberJobsRunning = jobManagerRunners.size();
+ final int numberJobsRunning = jobManagerRunnerFutures.size();
ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
numberJobsRunning);
- for (JobID jobId : jobManagerRunners.keySet()) {
+ for (JobID jobId : jobManagerRunnerFutures.keySet()) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture
@@ -836,10 +853,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
setNewFencingToken(dispatcherId);
- Collection<CompletableFuture<Void>> runFutures = new ArrayList<>(recoveredJobs.size());
+ Collection<CompletableFuture<?>> runFutures = new ArrayList<>(recoveredJobs.size());
for (JobGraph recoveredJob : recoveredJobs) {
- final CompletableFuture<Void> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
+ final CompletableFuture<?> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
runFutures.add(runFuture);
}
@@ -850,7 +867,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
}
- private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
+ private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
@@ -858,16 +875,16 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
throwable)); });
- return jobManagerTerminationFuture.thenRunAsync(
- ThrowingRunnable.unchecked(() -> {
+ return jobManagerTerminationFuture.thenComposeAsync(
+ FunctionUtils.uncheckedFunction((ignored) -> {
jobManagerTerminationFutures.remove(jobId);
- action.accept(jobGraph);
+ return action.apply(jobGraph);
}),
getMainThreadExecutor());
}
CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
- if (jobManagerRunners.containsKey(jobId)) {
+ if (jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
} else {
return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
@@ -923,7 +940,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
public void onAddedJobGraph(final JobID jobId) {
runAsync(
() -> {
- if (!jobManagerRunners.containsKey(jobId)) {
+ if (!jobManagerRunnerFutures.containsKey(jobId)) {
// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
// the specified job is already removed from the SubmittedJobGraphStore. In this case,
// SubmittedJobGraphStore.recoverJob returns null.
@@ -962,7 +979,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
final JobID jobId = jobGraph.getJobID();
- if (jobManagerRunners.containsKey(jobId)) {
+ if (jobManagerRunnerFutures.containsKey(jobId)) {
// we must not release the job graph lock since it can only be locked once and
// is currently being executed. Once we support multiple locks, we must release
// the JobGraph here
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 1af10b8..2442676 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -64,6 +65,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.After;
import org.junit.AfterClass;
@@ -86,7 +88,9 @@ import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -144,6 +148,10 @@ public class DispatcherTest extends TestLogger {
/** Instance under test. */
private TestingDispatcher dispatcher;
+ private TestingHighAvailabilityServices haServices;
+
+ private HeartbeatServices heartbeatServices;
+
@BeforeClass
public static void setupClass() {
rpcService = new TestingRpcService();
@@ -166,13 +174,13 @@ public class DispatcherTest extends TestLogger {
jobGraph.setAllowQueuedScheduling(true);
fatalErrorHandler = new TestingFatalErrorHandler();
- final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
+ heartbeatServices = new HeartbeatServices(1000L, 10000L);
submittedJobGraphStore = new FaultySubmittedJobGraphStore();
dispatcherLeaderElectionService = new TestingLeaderElectionService();
jobMasterLeaderElectionService = new TestingLeaderElectionService();
- final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ haServices = new TestingHighAvailabilityServices();
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService);
@@ -188,14 +196,18 @@ public class DispatcherTest extends TestLogger {
createdJobManagerRunnerLatch = new CountDownLatch(2);
blobServer = new BlobServer(configuration, new VoidBlobStore());
+ }
- dispatcher = createDispatcher(heartbeatServices, haServices);
-
+ @Nonnull
+ private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+ final TestingDispatcher dispatcher = createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
dispatcher.start();
+
+ return dispatcher;
}
@Nonnull
- private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices) throws Exception {
+ private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
return new TestingDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
@@ -207,7 +219,7 @@ public class DispatcherTest extends TestLogger {
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch),
+ jobManagerRunnerFactory,
fatalErrorHandler);
}
@@ -216,7 +228,13 @@ public class DispatcherTest extends TestLogger {
try {
fatalErrorHandler.rethrowError();
} finally {
- RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+ if (dispatcher != null) {
+ RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+ }
+ }
+
+ if (haServices != null) {
+ haServices.closeAndCleanupAllData();
}
if (blobServer != null) {
@@ -230,6 +248,8 @@ public class DispatcherTest extends TestLogger {
*/
@Test
public void testJobSubmission() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
// wait for the leader to be elected
@@ -251,6 +271,8 @@ public class DispatcherTest extends TestLogger {
*/
@Test
public void testLeaderElection() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
CompletableFuture<Void> jobIdsFuture = new CompletableFuture<>();
submittedJobGraphStore.setJobIdsFunction(
(Collection<JobID> jobIds) -> {
@@ -270,6 +292,8 @@ public class DispatcherTest extends TestLogger {
*/
@Test
public void testSubmittedJobGraphListener() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -294,6 +318,8 @@ public class DispatcherTest extends TestLogger {
@Test
public void testOnAddedJobGraphRecoveryFailure() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
final FlinkException expectedFailure = new FlinkException("Expected failure");
submittedJobGraphStore.setRecoveryFailure(expectedFailure);
@@ -313,6 +339,8 @@ public class DispatcherTest extends TestLogger {
@Test
public void testOnAddedJobGraphWithFinishedJob() throws Throwable {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
@@ -333,6 +361,8 @@ public class DispatcherTest extends TestLogger {
*/
@Test
public void testCacheJobExecutionResult() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -358,6 +388,8 @@ public class DispatcherTest extends TestLogger {
@Test
public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -374,6 +406,8 @@ public class DispatcherTest extends TestLogger {
*/
@Test
public void testJobRecovery() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
// elect the initial dispatcher as the leader
@@ -413,6 +447,8 @@ public class DispatcherTest extends TestLogger {
final URI externalPointer = createTestingSavepoint();
final Path savepointPath = Paths.get(externalPointer);
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -447,7 +483,9 @@ public class DispatcherTest extends TestLogger {
* to it. See FLINK-8887.
*/
@Test
- public void testWaitingForJobMasterLeadership() throws ExecutionException, InterruptedException {
+ public void testWaitingForJobMasterLeadership() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -476,6 +514,8 @@ public class DispatcherTest extends TestLogger {
*/
@Test
public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
final FlinkException testException = new FlinkException("Test exception");
submittedJobGraphStore.setJobIdsFunction(
(Collection<JobID> jobIds) -> {
@@ -500,6 +540,8 @@ public class DispatcherTest extends TestLogger {
public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
final FlinkException testException = new FlinkException("Test exception");
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph, null);
submittedJobGraphStore.putJobGraph(submittedJobGraph);
@@ -526,6 +568,8 @@ public class DispatcherTest extends TestLogger {
public void testJobSubmissionErrorAfterJobRecovery() throws Exception {
final FlinkException testException = new FlinkException("Test exception");
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
final JobGraph failingJobGraph = createFailingJobGraph(testException);
final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(failingJobGraph, null);
@@ -540,6 +584,102 @@ public class DispatcherTest extends TestLogger {
fatalErrorHandler.clearError();
}
+ /**
+ * Tests that a blocking {@link JobManagerRunner} creation, e.g. due to blocking FileSystem access,
+ * does not block the {@link Dispatcher}.
+ *
+ * <p>See FLINK-10314
+ */
+ @Test
+ public void testBlockingJobManagerRunner() throws Exception {
+ final OneShotLatch jobManagerRunnerCreationLatch = new OneShotLatch();
+ dispatcher = createAndStartDispatcher(
+ heartbeatServices,
+ haServices,
+ new BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await));
+
+ dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+ final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+ assertThat(submissionFuture.isDone(), is(false));
+
+ final CompletableFuture<Collection<String>> metricQueryServicePathsFuture = dispatcherGateway.requestMetricQueryServicePaths(Time.seconds(5L));
+
+ assertThat(metricQueryServicePathsFuture.get(), is(empty()));
+
+ assertThat(submissionFuture.isDone(), is(false));
+
+ jobManagerRunnerCreationLatch.trigger();
+
+ submissionFuture.get();
+ }
+
+ /**
+ * Tests that a failing {@link JobManagerRunner} will be properly cleaned up.
+ */
+ @Test
+ public void testFailingJobManagerRunnerCleanup() throws Exception {
+ final FlinkException testException = new FlinkException("Test exception.");
+ final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
+
+ dispatcher = createAndStartDispatcher(
+ heartbeatServices,
+ haServices,
+ new BlockingJobManagerRunnerFactory(() -> {
+ final Optional<Exception> take = queue.take();
+ final Exception exception = take.orElse(null);
+
+ if (exception != null) {
+ throw exception;
+ }
+ }));
+
+ dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+ final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+ assertThat(submissionFuture.isDone(), is(false));
+
+ queue.offer(Optional.of(testException));
+
+ try {
+ submissionFuture.get();
+ fail("Should fail because we could not instantiate the JobManagerRunner.");
+ } catch (Exception e) {
+ assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true));
+ }
+
+ submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+ queue.offer(Optional.empty());
+
+ submissionFuture.get();
+ }
+
+ private final class BlockingJobManagerRunnerFactory extends TestingJobManagerRunnerFactory {
+
+ @Nonnull
+ private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
+
+ BlockingJobManagerRunnerFactory(@Nonnull ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+ super(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null));
+
+ this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
+ }
+
+ @Override
+ public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+ jobManagerRunnerCreationLatch.run();
+
+ return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+ }
+ }
+
private void electDispatcher() {
UUID expectedLeaderSessionId = UUID.randomUUID();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index cb48648..992f087 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -40,7 +40,7 @@ import static org.mockito.Mockito.when;
* {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
* testing purposes.
*/
-final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
private final CompletableFuture<JobGraph> jobGraphFuture;
private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -63,12 +63,13 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
BlobServer blobServer,
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
- FatalErrorHandler fatalErrorHandler) {
+ FatalErrorHandler fatalErrorHandler) throws Exception {
jobGraphFuture.complete(jobGraph);
final JobManagerRunner mock = mock(JobManagerRunner.class);
when(mock.getResultFuture()).thenReturn(resultFuture);
when(mock.closeAsync()).thenReturn(terminationFuture);
+ when(mock.getJobGraph()).thenReturn(jobGraph);
return mock;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index c38ea5d..950a4e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -298,7 +298,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
@Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
- return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
+ return CompletableFuture.completedFuture(new ResourceOverview(1, 1, 1));
}
@Override