You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/11 23:25:59 UTC
flink git commit: [FLINK-9706] Properly wait for termination of
JobManagerRunner before restarting jobs
Repository: flink
Updated Branches:
refs/heads/master dc7d81c9c -> 3c4e59a7f
[FLINK-9706] Properly wait for termination of JobManagerRunner before restarting jobs
In order to avoid race conditions between resource clean up, we now wait for the proper
termination of a previously running JobMaster responsible for the same job (e.g. originating
from a job recovery or a re-submission).
This closes #6279.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c4e59a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c4e59a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c4e59a7
Branch: refs/heads/master
Commit: 3c4e59a7f78deeaccf41022e92699e1ef7510cc3
Parents: dc7d81c
Author: Till Rohrmann <tr...@apache.org>
Authored: Sat Jul 7 10:53:38 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 12 01:25:23 2018 +0200
----------------------------------------------------------------------
.../util/function/ConsumerWithException.java | 43 +++++++
.../flink/runtime/dispatcher/Dispatcher.java | 120 ++++++++++++-------
.../DispatcherResourceCleanupTest.java | 59 ++++++++-
3 files changed, 176 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3c4e59a7/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
new file mode 100644
index 0000000..09507d4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.function.Consumer;
+
+/**
+ * A checked extension of the {@link Consumer} interface.
+ *
+ * @param <T> type of the first argument
+ * @param <E> type of the thrown exception
+ */
+public interface ConsumerWithException<T, E extends Throwable> extends Consumer<T> {
+
+ void acceptWithException(T value) throws E;
+
+ @Override
+ default void accept(T value) {
+ try {
+ acceptWithException(value);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c4e59a7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 652782f..5306d6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -64,13 +64,13 @@ import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ConsumerWithException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -126,7 +126,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Nullable
protected final String restAddress;
- private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
+ private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
public Dispatcher(
RpcService rpcService,
@@ -173,6 +173,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);
+
+ this.jobManagerTerminationFutures = new HashMap<>(2);
}
//------------------------------------------------------
@@ -183,11 +185,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
public CompletableFuture<Void> postStop() {
log.info("Stopping dispatcher {}.", getAddress());
- final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
-
- final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
- jobManagerRunnersTerminationFuture,
- orphanedJobManagerRunnersTerminationFuture));
+ final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture();
return FutureUtils.runAfterwards(
allJobManagerRunnersTerminationFuture,
@@ -238,20 +236,26 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
final JobID jobId = jobGraph.getJobID();
log.info("Submitting job {} ({}).", jobId, jobGraph.getName());
+ final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
try {
- final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
+ jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
+ }
- if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
- return FutureUtils.completedExceptionally(
- new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
- } else {
- persistAndRunJob(jobGraph);
+ if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
+ return FutureUtils.completedExceptionally(
+ new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
+ } else {
+ final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob)
+ .thenApply(ignored -> Acknowledge.get());
- return CompletableFuture.completedFuture(Acknowledge.get());
- }
- } catch (Exception e) {
- return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to submit job %s.", jobId), e));
+ return persistAndRunFuture.exceptionally(
+ (Throwable throwable) -> {
+ throw new CompletionException(
+ new JobSubmissionException(jobId, "Failed to submit job.", throwable));
+ });
}
}
@@ -536,7 +540,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) {
final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA);
- registerOrphanedJobManagerTerminationFuture(cleanupFuture);
+ registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
+ }
+
+ private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
+ Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
+
+ jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);
+
+ // clean up the pending termination future
+ jobManagerRunnerTerminationFuture.thenRunAsync(
+ () -> {
+ final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId);
+
+ //noinspection ObjectEquality
+ if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
+ jobManagerTerminationFutures.put(jobId, terminationFuture);
+ }
+ },
+ getUnfencedMainThreadExecutor());
}
private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
@@ -573,19 +595,21 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
/**
* Terminate all currently running {@link JobManagerRunner}.
- *
- * @return Future which is completed once all {@link JobManagerRunner} have terminated
*/
- private CompletableFuture<Void> terminateJobManagerRunners() {
+ private void terminateJobManagerRunners() {
log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunners.keySet());
- final List<CompletableFuture<Void>> terminationFutures = jobsToRemove.stream()
- .map(jobId -> removeJob(jobId, false))
- .collect(Collectors.toList());
+ for (JobID jobId : jobsToRemove) {
+ removeJobAndRegisterTerminationFuture(jobId, false);
+ }
+ }
- return FutureUtils.completeAll(terminationFutures);
+ private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
+ terminateJobManagerRunners();
+ final Collection<CompletableFuture<Void>> values = jobManagerTerminationFutures.values();
+ return FutureUtils.completeAll(values);
}
/**
@@ -677,12 +701,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
}
- private void registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
- orphanedJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
- orphanedJobManagerRunnersTerminationFuture,
- jobManagerRunnerTerminationFuture));
- }
-
private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
@@ -741,7 +759,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
- final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenApplyAsync(
+ final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
getUnfencedMainThreadExecutor());
@@ -761,31 +779,44 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
});
}
- private boolean tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
+ private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
setNewFencingToken(dispatcherId);
+ Collection<CompletableFuture<Void>> runFutures = new ArrayList<>(recoveredJobs.size());
+
for (JobGraph recoveredJob : recoveredJobs) {
- try {
- runJob(recoveredJob);
- } catch (Exception e) {
- throw new CompletionException(
- new FlinkException(
- String.format("Failed to recover job %s.", recoveredJob.getJobID()),
- e));
- }
+ final CompletableFuture<Void> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
+ runFutures.add(runFuture);
}
- return true;
+ return FutureUtils.waitForAll(runFutures).thenApply(ignored -> true);
} else {
log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), dispatcherId);
- return false;
+ return CompletableFuture.completedFuture(false);
}
}
+ private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
+ final CompletableFuture<Void> jobManagerTerminationFuture = jobManagerTerminationFutures
+ .getOrDefault(jobId, CompletableFuture.completedFuture(null))
+ .exceptionally((Throwable throwable) -> {
+ throw new CompletionException(
+ new DispatcherException(
+ String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
+ throwable)); });
+
+ return jobManagerTerminationFuture.thenRunAsync(
+ () -> {
+ jobManagerTerminationFutures.remove(jobId);
+ action.accept(jobGraph);
+ },
+ getMainThreadExecutor());
+ }
+
private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
// clear the state if we've been the leader before
if (getFencingToken() != null) {
@@ -796,8 +827,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
private void clearDispatcherState() {
- final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
- registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture);
+ terminateJobManagerRunners();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/3c4e59a7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 71125e4..e42b14a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -127,6 +127,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private CompletableFuture<JobID> deleteAllFuture;
private CompletableFuture<ArchivedExecutionGraph> resultFuture;
private CompletableFuture<JobID> cleanupJobFuture;
+ private CompletableFuture<Void> terminationFuture;
@BeforeClass
public static void setupClass() {
@@ -162,6 +163,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
.createTestingBlobStore();
cleanupJobFuture = new CompletableFuture<>();
+ terminationFuture = new CompletableFuture<>();
blobServer = new TestingBlobServer(configuration, testingBlobStore, cleanupJobFuture);
@@ -185,7 +187,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new TestingJobManagerRunnerFactory(resultFuture, CompletableFuture.completedFuture(null)),
+ new TestingJobManagerRunnerFactory(resultFuture, terminationFuture),
fatalErrorHandler);
dispatcher.start();
@@ -225,6 +227,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
// complete the job
resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
+ terminationFuture.complete(null);
assertThat(cleanupJobFuture.get(), equalTo(jobId));
@@ -245,6 +248,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
// job not finished
resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
+ terminationFuture.complete(null);
assertThat(cleanupJobFuture.get(), equalTo(jobId));
@@ -266,6 +270,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
submitJob();
dispatcher.shutDown();
+ terminationFuture.complete(null);
dispatcher.getTerminationFuture().get();
assertThat(cleanupJobFuture.get(), equalTo(jobId));
@@ -295,6 +300,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(runningJobsRegistry.contains(jobId), is(true));
resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+ terminationFuture.complete(null);
// wait for the clearing
clearedJobLatch.await();
@@ -302,6 +308,57 @@ public class DispatcherResourceCleanupTest extends TestLogger {
assertThat(runningJobsRegistry.contains(jobId), is(false));
}
+ /**
+ * Tests that the previous JobManager needs to be completely terminated
+ * before a new job with the same {@link JobID} is started.
+ */
+ @Test
+ public void testJobSubmissionUnderSameJobId() throws Exception {
+ submitJob();
+
+ runningJobsRegistry.setJobRunning(jobId);
+ resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
+
+ final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+
+ try {
+ submissionFuture.get(10L, TimeUnit.MILLISECONDS);
+ fail("The job submission future should not complete until the previous JobManager " +
+ "termination future has been completed.");
+ } catch (TimeoutException ignored) {
+ // expected
+ } finally {
+ terminationFuture.complete(null);
+ }
+
+ assertThat(submissionFuture.get(), equalTo(Acknowledge.get()));
+ }
+
+ /**
+ * Tests that recovered jobs will only be started after the complete termination of any
+ * other previously running JobMasters for the same job.
+ */
+ @Test
+ public void testJobRecoveryWithPendingTermination() throws Exception {
+ submitJob();
+ runningJobsRegistry.setJobRunning(jobId);
+
+ dispatcherLeaderElectionService.notLeader();
+ final UUID leaderSessionId = UUID.randomUUID();
+ final CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(leaderSessionId);
+
+ try {
+ leaderFuture.get(10L, TimeUnit.MILLISECONDS);
+ fail("We should not become leader before all previously running JobMasters have terminated.");
+ } catch (TimeoutException ignored) {
+ // expected
+ } finally {
+ terminationFuture.complete(null);
+ }
+
+ assertThat(leaderFuture.get(), equalTo(leaderSessionId));
+ }
+
private static final class SingleRunningJobsRegistry implements RunningJobsRegistry {
@Nonnull