You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/11 23:25:59 UTC

flink git commit: [FLINK-9706] Properly wait for termination of JobManagerRunner before restarting jobs

Repository: flink
Updated Branches:
  refs/heads/master dc7d81c9c -> 3c4e59a7f


[FLINK-9706] Properly wait for termination of JobManagerRunner before restarting jobs

In order to avoid race conditions between resource clean up, we now wait for the proper
termination of a previously running JobMaster responsible for the same job (e.g. originating
from a job recovery or a re-submission).

This closes #6279.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c4e59a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c4e59a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c4e59a7

Branch: refs/heads/master
Commit: 3c4e59a7f78deeaccf41022e92699e1ef7510cc3
Parents: dc7d81c
Author: Till Rohrmann <tr...@apache.org>
Authored: Sat Jul 7 10:53:38 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 12 01:25:23 2018 +0200

----------------------------------------------------------------------
 .../util/function/ConsumerWithException.java    |  43 +++++++
 .../flink/runtime/dispatcher/Dispatcher.java    | 120 ++++++++++++-------
 .../DispatcherResourceCleanupTest.java          |  59 ++++++++-
 3 files changed, 176 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c4e59a7/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
new file mode 100644
index 0000000..09507d4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.function.Consumer;
+
+/**
+ * A checked extension of the {@link Consumer} interface.
+ *
+ * @param <T> type of the first argument
+ * @param <E> type of the thrown exception
+ */
+public interface ConsumerWithException<T, E extends Throwable> extends Consumer<T> {
+
+	void acceptWithException(T value) throws E;
+
+	@Override
+	default void accept(T value) {
+		try {
+			acceptWithException(value);
+		} catch (Throwable t) {
+			ExceptionUtils.rethrow(t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c4e59a7/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 652782f..5306d6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -64,13 +64,13 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -126,7 +126,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	@Nullable
 	protected final String restAddress;
 
-	private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
+	private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
 
 	public Dispatcher(
 			RpcService rpcService,
@@ -173,6 +173,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
 
 		this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);
+
+		this.jobManagerTerminationFutures = new HashMap<>(2);
 	}
 
 	//------------------------------------------------------
@@ -183,11 +185,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	public CompletableFuture<Void> postStop() {
 		log.info("Stopping dispatcher {}.", getAddress());
 
-		final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
-
-		final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
-			jobManagerRunnersTerminationFuture,
-			orphanedJobManagerRunnersTerminationFuture));
+		final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture();
 
 		return FutureUtils.runAfterwards(
 			allJobManagerRunnersTerminationFuture,
@@ -238,20 +236,26 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		final JobID jobId = jobGraph.getJobID();
 
 		log.info("Submitting job {} ({}).", jobId, jobGraph.getName());
+		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
 
 		try {
-			final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
+			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
+		}
 
-			if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
-				return FutureUtils.completedExceptionally(
-					new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
-			} else {
-				persistAndRunJob(jobGraph);
+		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
+			return FutureUtils.completedExceptionally(
+				new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
+		} else {
+			final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob)
+				.thenApply(ignored -> Acknowledge.get());
 
-				return CompletableFuture.completedFuture(Acknowledge.get());
-			}
-		} catch (Exception e) {
-			return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to submit job %s.", jobId), e));
+			return persistAndRunFuture.exceptionally(
+				(Throwable throwable) -> {
+					throw new CompletionException(
+						new JobSubmissionException(jobId, "Failed to submit job.", throwable));
+				});
 		}
 	}
 
@@ -536,7 +540,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) {
 		final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA);
 
-		registerOrphanedJobManagerTerminationFuture(cleanupFuture);
+		registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
+	}
+
+	private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
+		Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
+
+		jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);
+
+		// clean up the pending termination future
+		jobManagerRunnerTerminationFuture.thenRunAsync(
+			() -> {
+				final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId);
+
+				//noinspection ObjectEquality
+				if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
+					jobManagerTerminationFutures.put(jobId, terminationFuture);
+				}
+			},
+			getUnfencedMainThreadExecutor());
 	}
 
 	private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
@@ -573,19 +595,21 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	/**
 	 * Terminate all currently running {@link JobManagerRunner}.
-	 *
-	 * @return Future which is completed once all {@link JobManagerRunner} have terminated
 	 */
-	private CompletableFuture<Void> terminateJobManagerRunners() {
+	private void terminateJobManagerRunners() {
 		log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
 
 		final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunners.keySet());
 
-		final List<CompletableFuture<Void>> terminationFutures = jobsToRemove.stream()
-			.map(jobId -> removeJob(jobId, false))
-			.collect(Collectors.toList());
+		for (JobID jobId : jobsToRemove) {
+			removeJobAndRegisterTerminationFuture(jobId, false);
+		}
+	}
 
-		return FutureUtils.completeAll(terminationFutures);
+	private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
+		terminateJobManagerRunners();
+		final Collection<CompletableFuture<Void>> values = jobManagerTerminationFutures.values();
+		return FutureUtils.completeAll(values);
 	}
 
 	/**
@@ -677,12 +701,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
 	}
 
-	private void registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
-		orphanedJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
-			orphanedJobManagerRunnersTerminationFuture,
-			jobManagerRunnerTerminationFuture));
-	}
-
 	private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
 		final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
 
@@ -741,7 +759,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
 
-		final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenApplyAsync(
+		final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
 			(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
 			getUnfencedMainThreadExecutor());
 
@@ -761,31 +779,44 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			});
 	}
 
-	private boolean tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
+	private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
 		final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
 
 		if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
 			log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
 			setNewFencingToken(dispatcherId);
 
+			Collection<CompletableFuture<Void>> runFutures = new ArrayList<>(recoveredJobs.size());
+
 			for (JobGraph recoveredJob : recoveredJobs) {
-				try {
-					runJob(recoveredJob);
-				} catch (Exception e) {
-					throw new CompletionException(
-						new FlinkException(
-							String.format("Failed to recover job %s.", recoveredJob.getJobID()),
-							e));
-				}
+				final CompletableFuture<Void> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
+				runFutures.add(runFuture);
 			}
 
-			return true;
+			return FutureUtils.waitForAll(runFutures).thenApply(ignored -> true);
 		} else {
 			log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), dispatcherId);
-			return false;
+			return CompletableFuture.completedFuture(false);
 		}
 	}
 
+	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
+		final CompletableFuture<Void> jobManagerTerminationFuture = jobManagerTerminationFutures
+			.getOrDefault(jobId, CompletableFuture.completedFuture(null))
+			.exceptionally((Throwable throwable) -> {
+				throw new CompletionException(
+					new DispatcherException(
+						String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
+						throwable)); });
+
+		return jobManagerTerminationFuture.thenRunAsync(
+			() -> {
+				jobManagerTerminationFutures.remove(jobId);
+				action.accept(jobGraph);
+			},
+			getMainThreadExecutor());
+	}
+
 	private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
 		// clear the state if we've been the leader before
 		if (getFencingToken() != null) {
@@ -796,8 +827,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	private void clearDispatcherState() {
-		final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
-		registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture);
+		terminateJobManagerRunners();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3c4e59a7/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 71125e4..e42b14a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -127,6 +127,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 	private CompletableFuture<JobID> deleteAllFuture;
 	private CompletableFuture<ArchivedExecutionGraph> resultFuture;
 	private CompletableFuture<JobID> cleanupJobFuture;
+	private CompletableFuture<Void> terminationFuture;
 
 	@BeforeClass
 	public static void setupClass() {
@@ -162,6 +163,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			.createTestingBlobStore();
 
 		cleanupJobFuture = new CompletableFuture<>();
+		terminationFuture = new CompletableFuture<>();
 
 		blobServer = new TestingBlobServer(configuration, testingBlobStore, cleanupJobFuture);
 
@@ -185,7 +187,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(resultFuture, CompletableFuture.completedFuture(null)),
+			new TestingJobManagerRunnerFactory(resultFuture, terminationFuture),
 			fatalErrorHandler);
 
 		dispatcher.start();
@@ -225,6 +227,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
 		// complete the job
 		resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
+		terminationFuture.complete(null);
 
 		assertThat(cleanupJobFuture.get(), equalTo(jobId));
 
@@ -245,6 +248,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 
 		// job not finished
 		resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
+		terminationFuture.complete(null);
 
 		assertThat(cleanupJobFuture.get(), equalTo(jobId));
 
@@ -266,6 +270,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		submitJob();
 
 		dispatcher.shutDown();
+		terminationFuture.complete(null);
 		dispatcher.getTerminationFuture().get();
 
 		assertThat(cleanupJobFuture.get(), equalTo(jobId));
@@ -295,6 +300,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		assertThat(runningJobsRegistry.contains(jobId), is(true));
 
 		resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+		terminationFuture.complete(null);
 
 		// wait for the clearing
 		clearedJobLatch.await();
@@ -302,6 +308,57 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		assertThat(runningJobsRegistry.contains(jobId), is(false));
 	}
 
+	/**
+	 * Tests that the previous JobManager needs to be completely terminated
+	 * before a new job with the same {@link JobID} is started.
+	 */
+	@Test
+	public void testJobSubmissionUnderSameJobId() throws Exception {
+		submitJob();
+
+		runningJobsRegistry.setJobRunning(jobId);
+		resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
+
+		final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+
+		try {
+			submissionFuture.get(10L, TimeUnit.MILLISECONDS);
+			fail("The job submission future should not complete until the previous JobManager " +
+				"termination future has been completed.");
+		} catch (TimeoutException ignored) {
+			// expected
+		} finally {
+			terminationFuture.complete(null);
+		}
+
+		assertThat(submissionFuture.get(), equalTo(Acknowledge.get()));
+	}
+
+	/**
+	 * Tests that recovered jobs will only be started after the complete termination of any
+	 * other previously running JobMasters for the same job.
+	 */
+	@Test
+	public void testJobRecoveryWithPendingTermination() throws Exception {
+		submitJob();
+		runningJobsRegistry.setJobRunning(jobId);
+
+		dispatcherLeaderElectionService.notLeader();
+		final UUID leaderSessionId = UUID.randomUUID();
+		final CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(leaderSessionId);
+
+		try {
+			leaderFuture.get(10L, TimeUnit.MILLISECONDS);
+			fail("We should not become leader before all previously running JobMasters have terminated.");
+		} catch (TimeoutException ignored) {
+			// expected
+		} finally {
+			terminationFuture.complete(null);
+		}
+
+		assertThat(leaderFuture.get(), equalTo(leaderSessionId));
+	}
+
 	private static final class SingleRunningJobsRegistry implements RunningJobsRegistry {
 
 		@Nonnull