You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/14 13:03:46 UTC

[GitHub] asfgit closed pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader

asfgit closed pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 326e924b448..d8ad5aba8ea 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -22,6 +22,7 @@
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Properties;
 
@@ -36,8 +37,8 @@
 	@Nonnull
 	private final SavepointRestoreSettings savepointRestoreSettings;
 
-	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
-		super(configDir, dynamicProperties, args, restPort);
+	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String hostname, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
+		super(configDir, dynamicProperties, args, hostname, restPort);
 		this.jobClassName = jobClassName;
 		this.savepointRestoreSettings = savepointRestoreSettings;
 	}
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index 3c65ba864ed..17217eff018 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -32,6 +32,7 @@
 
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
 
 /**
@@ -67,6 +68,7 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
 		final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
 		final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
 		final int restPort = Integer.parseInt(restPortString);
+		final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
 		final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
 		final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);
 
@@ -74,6 +76,7 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
 			configDir,
 			dynamicProperties,
 			commandLine.getArgs(),
+			hostname,
 			restPort,
 			jobClassName,
 			savepointRestoreSettings);
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index 5864c8a985d..6fc5b76f246 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -30,7 +30,7 @@
  * @param <E> type of the thrown exception
  */
 @FunctionalInterface
-public interface BiConsumerWithException<T, U, E extends Throwable> extends BiConsumer<T, U> {
+public interface BiConsumerWithException<T, U, E extends Throwable> {
 
 	/**
 	 * Performs this operation on the given arguments.
@@ -39,14 +39,23 @@
 	 * @param u the second input argument
 	 * @throws E in case of an error
 	 */
-	void acceptWithException(T t, U u) throws E;
+	void accept(T t, U u) throws E;
 
-	@Override
-	default void accept(T t, U u) {
-		try {
-			acceptWithException(t, u);
-		} catch (Throwable e) {
-			ExceptionUtils.rethrow(e);
-		}
+	/**
+	 * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
+	 *
+	 * @param biConsumerWithException BiConsumer with exception to convert into a {@link BiConsumer}.
+	 * @param <A> first input type
+	 * @param <B> second input type
+	 * @return {@link BiConsumer} which rethrows all checked exceptions as unchecked.
+	 */
+	static <A, B> BiConsumer<A, B> unchecked(BiConsumerWithException<A, B, ?> biConsumerWithException) {
+		return (A a, B b) -> {
+			try {
+				biConsumerWithException.accept(a, b);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
index 967c737e584..ccba8a7e774 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
@@ -31,7 +31,7 @@
  * @param <E> type of the exception which can be thrown
  */
 @FunctionalInterface
-public interface BiFunctionWithException<T, U, R, E extends Throwable> extends BiFunction<T, U, R> {
+public interface BiFunctionWithException<T, U, R, E extends Throwable> {
 
 	/**
 	 * Apply the given values t and u to obtain the resulting value. The operation can
@@ -42,16 +42,25 @@
 	 * @return result value
 	 * @throws E if the operation fails
 	 */
-	R applyWithException(T t, U u) throws E;
+	R apply(T t, U u) throws E;
 
-	default R apply(T t, U u) {
-		try {
-			return applyWithException(t, u);
-		} catch (Throwable e) {
-			ExceptionUtils.rethrow(e);
-			// we have to return a value to please the compiler
-			// but we will never reach the code here
-			return null;
-		}
+	/**
+	 * Convert at {@link BiFunctionWithException} into a {@link BiFunction}.
+	 *
+	 * @param biFunctionWithException function with exception to convert into a function
+	 * @param <A> input type
+	 * @param <B> output type
+	 * @return {@link BiFunction} which throws all checked exception as an unchecked exception.
+	 */
+	static <A, B, C> BiFunction<A, B, C> unchecked(BiFunctionWithException<A, B, C, ?> biFunctionWithException) {
+		return (A a, B b) -> {
+			try {
+				return biFunctionWithException.apply(a, b);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+				// we need this to appease the compiler :-(
+				return null;
+			}
+		};
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
deleted file mode 100644
index 09507d4e9f2..00000000000
--- a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util.function;
-
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.function.Consumer;
-
-/**
- * A checked extension of the {@link Consumer} interface.
- *
- * @param <T> type of the first argument
- * @param <E> type of the thrown exception
- */
-public interface ConsumerWithException<T, E extends Throwable> extends Consumer<T> {
-
-	void acceptWithException(T value) throws E;
-
-	@Override
-	default void accept(T value) {
-		try {
-			acceptWithException(value);
-		} catch (Throwable t) {
-			ExceptionUtils.rethrow(t);
-		}
-	}
-}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
new file mode 100644
index 00000000000..678ef9f78b6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Utility class for Flink's functions.
+ */
+public class FunctionUtils {
+
+	private FunctionUtils() {
+		throw new UnsupportedOperationException("This class should never be instantiated.");
+	}
+
+	/**
+	 * Convert at {@link FunctionWithException} into a {@link Function}.
+	 *
+	 * @param functionWithException function with exception to convert into a function
+	 * @param <A> input type
+	 * @param <B> output type
+	 * @return {@link Function} which throws all checked exception as an unchecked exception.
+	 */
+	public static <A, B> Function<A, B> uncheckedFunction(FunctionWithException<A, B, ?> functionWithException) {
+		return (A value) -> {
+			try {
+				return functionWithException.apply(value);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+				// we need this to appease the compiler :-(
+				return null;
+			}
+		};
+	}
+
+	/**
+	 * Converts a {@link ThrowingConsumer} into a {@link Consumer} which throws checked exceptions
+	 * as unchecked.
+	 *
+	 * @param throwingConsumer to convert into a {@link Consumer}
+	 * @param <A> input type
+	 * @return {@link Consumer} which throws all checked exceptions as unchecked
+	 */
+	public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?> throwingConsumer) {
+		return (A value) -> {
+			try {
+				throwingConsumer.accept(value);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
index 4fef4207838..0dd4047a1e5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.util.function;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
 
 /**
  * Similar to a {@link Runnable}, this interface is used to capture a block of code
@@ -35,4 +36,21 @@
 	 * @throws E Exceptions may be thrown.
 	 */
 	void run() throws E;
+
+	/**
+	 * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions
+	 * as unchecked.
+	 *
+	 * @param throwingRunnable to convert into a {@link Runnable}
+	 * @return {@link Runnable} which throws all checked exceptions as unchecked.
+	 */
+	static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
+		return () -> {
+			try {
+				throwingRunnable.run();
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 131733924ae..e443fc21552 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,7 +25,7 @@
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
@@ -246,7 +246,7 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception
 		LOG.debug("Added {} to {}.", checkpoint, path);
 	}
 
-	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
+	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
 		try {
 			if (tryRemove(completedCheckpoint.getCheckpointID())) {
 				executor.execute(() -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c31e64c0adc..5279e502a93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -60,11 +60,13 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -128,6 +130,8 @@
 
 	private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
 
+	private CompletableFuture<Void> recoveryOperation = CompletableFuture.completedFuture(null);
+
 	public Dispatcher(
 			RpcService rpcService,
 			String endpointId,
@@ -629,31 +633,51 @@ private void terminateJobManagerRunners() {
 	 * Recovers all jobs persisted via the submitted job graph store.
 	 */
 	@VisibleForTesting
-	CompletableFuture<Collection<JobGraph>> recoverJobs() {
+	Collection<JobGraph> recoverJobs() throws Exception {
 		log.info("Recovering all persisted jobs.");
-		return FutureUtils.supplyAsync(
-			() -> {
-				final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
-
-				final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
+		final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
 
-				for (JobID jobId : jobIds) {
-					jobGraphs.add(recoverJob(jobId));
+		try {
+			return recoverJobGraphs(jobIds);
+		} catch (Exception e) {
+			// release all recovered job graphs
+			for (JobID jobId : jobIds) {
+				try {
+					submittedJobGraphStore.releaseJobGraph(jobId);
+				} catch (Exception ie) {
+					e.addSuppressed(ie);
 				}
+			}
+			throw e;
+		}
+	}
 
-				return jobGraphs;
-			},
-			getRpcService().getExecutor());
+	@Nonnull
+	private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) throws Exception {
+		final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
+
+		for (JobID jobId : jobIds) {
+			final JobGraph jobGraph = recoverJob(jobId);
+
+			if (jobGraph == null) {
+				throw new FlinkJobNotFoundException(jobId);
+			}
+
+			jobGraphs.add(jobGraph);
+		}
+
+		return jobGraphs;
 	}
 
+	@Nullable
 	private JobGraph recoverJob(JobID jobId) throws Exception {
 		log.debug("Recover job {}.", jobId);
-		SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
+		final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
 
 		if (submittedJobGraph != null) {
 			return submittedJobGraph.getJobGraph();
 		} else {
-			throw new FlinkJobNotFoundException(jobId);
+			return null;
 		}
 	}
 
@@ -768,27 +792,40 @@ private void jobMasterFailed(JobID jobId, Throwable cause) {
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
+		runAsyncWithoutFencing(
+			() -> {
+				log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
 
-		final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
+				final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoveryOperation.thenApplyAsync(
+					FunctionUtils.uncheckedFunction(ignored -> recoverJobs()),
+					getRpcService().getExecutor());
 
-		final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
-			(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
-			getUnfencedMainThreadExecutor());
+				final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
+					(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
+					getUnfencedMainThreadExecutor());
 
-		final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenAcceptAsync(
-			(Boolean confirmLeadership) -> {
-				if (confirmLeadership) {
-					leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
-				}
-			},
-			getRpcService().getExecutor());
+				final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
+					recoveredJobsFuture,
+					BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
+						if (confirmLeadership) {
+							leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+						} else {
+							for (JobGraph recoveredJob : recoveredJobs) {
+								submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
+							}
+						}
+						return null;
+					}),
+					getRpcService().getExecutor());
+
+				confirmationFuture.whenComplete(
+					(Void ignored, Throwable throwable) -> {
+						if (throwable != null) {
+							onFatalError(ExceptionUtils.stripCompletionException(throwable));
+						}
+					});
 
-		confirmationFuture.whenComplete(
-			(Void ignored, Throwable throwable) -> {
-				if (throwable != null) {
-					onFatalError(ExceptionUtils.stripCompletionException(throwable));
-				}
+				recoveryOperation = confirmationFuture;
 			});
 	}
 
@@ -813,7 +850,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
 		}
 	}
 
-	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
+	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
 		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
@@ -822,14 +859,14 @@ public void grantLeadership(final UUID newLeaderSessionID) {
 						throwable)); });
 
 		return jobManagerTerminationFuture.thenRunAsync(
-			() -> {
+			ThrowingRunnable.unchecked(() -> {
 				jobManagerTerminationFutures.remove(jobId);
 				action.accept(jobGraph);
-			},
+			}),
 			getMainThreadExecutor());
 	}
 
-	protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+	CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
 		if (jobManagerRunners.containsKey(jobId)) {
 			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
 		} else {
@@ -837,6 +874,11 @@ public void grantLeadership(final UUID newLeaderSessionID) {
 		}
 	}
 
+	@VisibleForTesting
+	CompletableFuture<Void> getRecoveryOperation() {
+		return recoveryOperation;
+	}
+
 	private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
 		// clear the state if we've been the leader before
 		if (getFencingToken() != null) {
@@ -879,24 +921,63 @@ public void handleError(final Exception exception) {
 
 	@Override
 	public void onAddedJobGraph(final JobID jobId) {
-		final CompletableFuture<SubmittedJobGraph> recoveredJob = getRpcService().execute(
-			() -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-		final CompletableFuture<Acknowledge> submissionFuture = recoveredJob.thenComposeAsync(
-			(SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-			getMainThreadExecutor());
-
-		submissionFuture.whenComplete(
-			(Acknowledge acknowledge, Throwable throwable) -> {
-				if (throwable != null) {
-					onFatalError(
-						new DispatcherException(
-							String.format("Could not start the added job %s", jobId),
-							ExceptionUtils.stripCompletionException(throwable)));
+		runAsync(
+			() -> {
+				if (!jobManagerRunners.containsKey(jobId)) {
+					// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
+					// the specified job is already removed from the SubmittedJobGraphStore. In this case,
+					// SubmittedJobGraphStore.recoverJob returns null.
+					final CompletableFuture<Optional<JobGraph>> recoveredJob = recoveryOperation.thenApplyAsync(
+						FunctionUtils.uncheckedFunction(ignored -> Optional.ofNullable(recoverJob(jobId))),
+						getRpcService().getExecutor());
+
+					final DispatcherId dispatcherId = getFencingToken();
+					final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
+						(Optional<JobGraph> jobGraphOptional) -> jobGraphOptional.map(
+							FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
+								FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
+										if (!isRecoveredJobRunning) {
+											submittedJobGraphStore.releaseJobGraph(jobId);
+										}
+									}),
+									getRpcService().getExecutor())))
+							.orElse(CompletableFuture.completedFuture(null)),
+						getUnfencedMainThreadExecutor());
+
+					submissionFuture.whenComplete(
+						(Void ignored, Throwable throwable) -> {
+							if (throwable != null) {
+								onFatalError(
+									new DispatcherException(
+										String.format("Could not start the added job %s", jobId),
+										ExceptionUtils.stripCompletionException(throwable)));
+							}
+						});
+
+					recoveryOperation = submissionFuture;
 				}
 			});
 	}
 
+	private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
+		if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
+			final JobID jobId = jobGraph.getJobID();
+			if (jobManagerRunners.containsKey(jobId)) {
+				// we must not release the job graph lock since it can only be locked once and
+				// is currently being executed. Once we support multiple locks, we must release
+				// the JobGraph here
+				log.debug("Ignore added JobGraph because the job {} is already running.", jobId);
+				return CompletableFuture.completedFuture(true);
+			} else if (runningJobsRegistry.getJobSchedulingStatus(jobId) != RunningJobsRegistry.JobSchedulingStatus.DONE) {
+				return waitForTerminatingJobManager(jobId, jobGraph, this::runJob).thenApply(ignored -> true);
+			} else {
+				log.debug("Ignore added JobGraph because the job {} has already been completed.", jobId);
+			}
+		}
+
+		return CompletableFuture.completedFuture(false);
+	}
+
 	@Override
 	public void onRemovedJobGraph(final JobID jobId) {
 		runAsync(() -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ddd3751cc2a..0fd4389fc35 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -708,6 +708,12 @@ protected static Configuration loadConfiguration(EntrypointClusterConfiguration
 			configuration.setInteger(RestOptions.PORT, restPort);
 		}
 
+		final String hostname = entrypointClusterConfiguration.getHostname();
+
+		if (hostname != null) {
+			configuration.setString(JobManagerOptions.ADDRESS, hostname);
+		}
+
 		return configuration;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
index 75cad7aa946..3472f35d6a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.entrypoint;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Properties;
 
@@ -27,14 +28,23 @@
  */
 public class EntrypointClusterConfiguration extends ClusterConfiguration {
 
+	@Nullable
+	private final String hostname;
+
 	private final int restPort;
 
-	public EntrypointClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort) {
+	public EntrypointClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String hostname, int restPort) {
 		super(configDir, dynamicProperties, args);
+		this.hostname = hostname;
 		this.restPort = restPort;
 	}
 
 	public int getRestPort() {
 		return restPort;
 	}
+
+	@Nullable
+	public String getHostname() {
+		return hostname;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
index 7dfb784a79c..52f59eeef8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
@@ -29,6 +29,8 @@
 
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.EXECUTION_MODE_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
 
 /**
@@ -42,6 +44,8 @@ public Options getOptions() {
 		options.addOption(CONFIG_DIR_OPTION);
 		options.addOption(REST_PORT_OPTION);
 		options.addOption(DYNAMIC_PROPERTY_OPTION);
+		options.addOption(HOST_OPTION);
+		options.addOption(EXECUTION_MODE_OPTION);
 
 		return options;
 	}
@@ -52,11 +56,13 @@ public EntrypointClusterConfiguration createResult(@Nonnull CommandLine commandL
 		final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
 		final String restPortStr = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
 		final int restPort = Integer.parseInt(restPortStr);
+		final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
 
 		return new EntrypointClusterConfiguration(
 			configDir,
 			dynamicProperties,
 			commandLine.getArgs(),
+			hostname,
 			restPort);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
index 23c9da2485f..443014b9903 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
@@ -48,5 +48,26 @@
 		.desc("use value for given property")
 		.build();
 
+	public static final Option HOST_OPTION = Option.builder("h")
+		.longOpt("host")
+		.required(false)
+		.hasArg(true)
+		.argName("hostname")
+		.desc("Hostname for the RPC service.")
+		.build();
+
+	/**
+	 * @deprecated exists only for compatibility with legacy mode. Remove once legacy mode
+	 * and execution mode option has been removed.
+	 */
+	@Deprecated
+	public static final Option EXECUTION_MODE_OPTION = Option.builder("x")
+		.longOpt("executionMode")
+		.required(false)
+		.hasArg(true)
+		.argName("execution mode")
+		.desc("Deprecated option")
+		.build();
+
 	private CommandLineOptions() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4a66d32a2ac..736984e88e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1483,7 +1483,7 @@ private void rescaleJobGraph(Collection<JobVertexID> operators, int newParalleli
 				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
 			}
 
-			rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+			rescalingBehaviour.accept(jobVertex, newParallelism);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
index 7de956081d8..64e2ffa1124 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
@@ -29,7 +29,7 @@
 	// rescaling is only executed if the operator can be set to the given parallelism
 	STRICT {
 		@Override
-		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
+		public void accept(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
 			if (jobVertex.getMaxParallelism() < newParallelism) {
 				throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() +
 					" because its maximum parallelism " + jobVertex.getMaxParallelism() +
@@ -42,7 +42,7 @@ public void acceptWithException(JobVertex jobVertex, Integer newParallelism) thr
 	// the new parallelism will be the minimum of the given parallelism and the maximum parallelism
 	RELAXED {
 		@Override
-		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) {
+		public void accept(JobVertex jobVertex, Integer newParallelism) {
 			jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism));
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 28af072a10c..38da82cb7fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -178,6 +178,7 @@ protected void initChannel(SocketChannel ch) {
 				.channel(NioServerSocketChannel.class)
 				.childHandler(initializer);
 
+			log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
 			final ChannelFuture channel;
 			if (restBindAddress == null) {
 				channel = bootstrap.bind(restBindPort);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 4d783352459..9c8f7bd1428 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -23,6 +23,7 @@
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -89,6 +90,23 @@ public static ActorGateway retrieveLeaderGateway(
 		}
 	}
 
+	/**
+	 * Retrieves the leader akka url and the current leader session ID. The values are stored in a
+	 * {@link LeaderConnectionInfo} instance.
+	 *
+	 * @param leaderRetrievalService Leader retrieval service to retrieve the leader connection
+	 *                               information
+	 * @param timeout Timeout when to give up looking for the leader
+	 * @return LeaderConnectionInfo containing the leader's akka URL and the current leader session
+	 * ID
+	 * @throws LeaderRetrievalException
+	 */
+	public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
+			LeaderRetrievalService leaderRetrievalService,
+			Time timeout) throws LeaderRetrievalException {
+		return retrieveLeaderConnectionInfo(leaderRetrievalService, FutureUtils.toFiniteDuration(timeout));
+	}
+
 	/**
 	 * Retrieves the leader akka url and the current leader session ID. The values are stored in a
 	 * {@link LeaderConnectionInfo} instance.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 8c3d31fc51b..b9cd0c1b720 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -526,22 +526,13 @@ protected String getLockPath(String rootPath) {
 				client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
 			} catch (KeeperException.NodeExistsException ignored) {
 				// we have already created the lock
-			} catch (KeeperException.NoNodeException e) {
-				throw new Exception("Cannot lock the node " + path + " since it does not exist.", e);
 			}
 		}
 
 		boolean success = false;
 
 		try {
-			byte[] data;
-
-			try {
-				data = client.getData().forPath(path);
-			} catch (Exception e) {
-				throw new Exception("Failed to retrieve state handle data under " + path +
-					" from ZooKeeper.", e);
-			}
+			byte[] data = client.getData().forPath(path);
 
 			try {
 				RetrievableStateHandle<T> retrievableStateHandle = InstantiationUtil.deserializeObject(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index cb26f4862b1..335199a2807 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -302,9 +302,7 @@ public void removeJobGraph(JobID jobId) throws Exception {
 		}
 
 		@Override
-		public void releaseJobGraph(JobID jobId) throws Exception {
-			throw new UnsupportedOperationException("Should not be called.");
-		}
+		public void releaseJobGraph(JobID jobId) throws Exception {}
 
 		@Override
 		public Collection<JobID> getJobIds() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d405fcdcf44..1af10b8c598 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -319,13 +319,13 @@ public void testOnAddedJobGraphWithFinishedJob() throws Throwable {
 		runningJobsRegistry.setJobFinished(TEST_JOB_ID);
 		dispatcher.onAddedJobGraph(TEST_JOB_ID);
 
-		final CompletableFuture<Throwable> errorFuture = fatalErrorHandler.getErrorFuture();
-
-		final Throwable throwable = errorFuture.get();
+		// wait until the recovery is over
+		dispatcher.getRecoverOperationFuture(TIMEOUT).get();
 
-		assertThat(throwable, instanceOf(DispatcherException.class));
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-		fatalErrorHandler.clearError();
+		// check that we did not start executing the added JobGraph
+		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), is(empty()));
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 5141be039f7..6a623768bb2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -73,16 +72,20 @@
 			VoidHistoryServerArchivist.INSTANCE);
 	}
 
-	@VisibleForTesting
 	void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
 		runAsync(
 			() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
 	}
 
-	@VisibleForTesting
-	public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
+	CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
 		return callAsyncWithoutFencing(
 			() -> getJobTerminationFuture(jobId),
 			timeout).thenCompose(Function.identity());
 	}
+
+	CompletableFuture<Void> getRecoverOperationFuture(@Nonnull Time timeout) {
+		return callAsyncWithoutFencing(
+			this::getRecoveryOperation,
+			timeout).thenCompose(Function.identity());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index dd0375886a9..b5662c064e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -24,17 +24,25 @@
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
@@ -56,8 +64,13 @@
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -67,8 +80,8 @@
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
-	@ClassRule
-	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+	@Rule
+	public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@@ -87,14 +100,14 @@
 	@BeforeClass
 	public static void setupClass() throws IOException {
 		configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
 		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
 		rpcService = new TestingRpcService();
 		blobServer = new BlobServer(configuration, new VoidBlobStore());
 	}
 
 	@Before
-	public void setup() {
+	public void setup() throws Exception {
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
 		testingFatalErrorHandler = new TestingFatalErrorHandler();
 	}
 
@@ -139,7 +152,9 @@ public void testSubmittedJobGraphRelease() throws Exception {
 			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
 			testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
 
-			final TestingDispatcher dispatcher = createDispatcher(testingHighAvailabilityServices);
+			final TestingDispatcher dispatcher = createDispatcher(
+				testingHighAvailabilityServices,
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
 
 			dispatcher.start();
 
@@ -167,7 +182,7 @@ public void testSubmittedJobGraphRelease() throws Exception {
 				// recover the job
 				final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId);
 
-				assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue()));
+				assertThat(submittedJobGraph, is(notNullValue()));
 
 				// check that the other submitted job graph store can remove the job graph after the original leader
 				// has lost its leadership
@@ -184,20 +199,145 @@ public void testSubmittedJobGraphRelease() throws Exception {
 		}
 	}
 
+	/**
+	 * Tests that a standby Dispatcher does not interfere with the clean up of a completed
+	 * job.
+	 */
+	@Test
+	public void testStandbyDispatcherJobExecution() throws Exception {
+		try (final TestingHighAvailabilityServices haServices1 = new TestingHighAvailabilityServices();
+			final TestingHighAvailabilityServices haServices2 = new TestingHighAvailabilityServices();
+			final CuratorFramework curatorFramework = ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore1 = ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+			haServices1.setSubmittedJobGraphStore(submittedJobGraphStore1);
+			final TestingLeaderElectionService leaderElectionService1 = new TestingLeaderElectionService();
+			haServices1.setDispatcherLeaderElectionService(leaderElectionService1);
+
+			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore2 = ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+			haServices2.setSubmittedJobGraphStore(submittedJobGraphStore2);
+			final TestingLeaderElectionService leaderElectionService2 = new TestingLeaderElectionService();
+			haServices2.setDispatcherLeaderElectionService(leaderElectionService2);
+
+			final CompletableFuture<JobGraph> jobGraphFuture = new CompletableFuture<>();
+			final CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
+			final TestingDispatcher dispatcher1 = createDispatcher(
+				haServices1,
+				new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+
+			final TestingDispatcher dispatcher2 = createDispatcher(
+				haServices2,
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+
+			try {
+				dispatcher1.start();
+				dispatcher2.start();
+
+				leaderElectionService1.isLeader(UUID.randomUUID()).get();
+				final DispatcherGateway dispatcherGateway1 = dispatcher1.getSelfGateway(DispatcherGateway.class);
+
+				final JobGraph jobGraph = DispatcherHATest.createNonEmptyJobGraph();
+
+				dispatcherGateway1.submitJob(jobGraph, TIMEOUT).get();
+
+				final CompletableFuture<JobResult> jobResultFuture = dispatcherGateway1.requestJobResult(jobGraph.getJobID(), TIMEOUT);
+
+				jobGraphFuture.get();
+
+				// complete the job
+				resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+
+				final JobResult jobResult = jobResultFuture.get();
+
+				assertThat(jobResult.isSuccess(), is(true));
+
+				// wait for the completion of the job
+				dispatcher1.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+				// change leadership
+				leaderElectionService1.notLeader();
+				leaderElectionService2.isLeader(UUID.randomUUID()).get();
+
+				// Dispatcher 2 should not recover any jobs
+				final DispatcherGateway dispatcherGateway2 = dispatcher2.getSelfGateway(DispatcherGateway.class);
+				assertThat(dispatcherGateway2.listJobs(TIMEOUT).get(), is(empty()));
+			} finally {
+				RpcUtils.terminateRpcEndpoint(dispatcher1, TIMEOUT);
+				RpcUtils.terminateRpcEndpoint(dispatcher2, TIMEOUT);
+			}
+		}
+	}
+
+	/**
+	 * Tests that a standby {@link Dispatcher} can recover all submitted jobs.
+	 */
+	@Test
+	public void testStandbyDispatcherJobRecovery() throws Exception {
+		try (CuratorFramework curatorFramework = ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+			HighAvailabilityServices haServices = null;
+			Dispatcher dispatcher1 = null;
+			Dispatcher dispatcher2 = null;
+
+			try {
+				haServices = new ZooKeeperHaServices(curatorFramework, rpcService.getExecutor(), configuration, new VoidBlobStore());
+
+				final CompletableFuture<JobGraph> jobGraphFuture1 = new CompletableFuture<>();
+				dispatcher1 = createDispatcher(
+					haServices,
+					new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+				final CompletableFuture<JobGraph> jobGraphFuture2 = new CompletableFuture<>();
+				dispatcher2 = createDispatcher(
+					haServices,
+					new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+
+				dispatcher1.start();
+				dispatcher2.start();
+
+				final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), TIMEOUT);
+
+				final DispatcherGateway dispatcherGateway = rpcService.connect(leaderConnectionInfo.getAddress(), DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()), DispatcherGateway.class).get();
+
+				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+				dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT).get();
+
+				if (dispatcher1.getAddress().equals(leaderConnectionInfo.getAddress())) {
+					dispatcher1.shutDown();
+					assertThat(jobGraphFuture2.get().getJobID(), is(equalTo(nonEmptyJobGraph.getJobID())));
+				} else {
+					dispatcher2.shutDown();
+					assertThat(jobGraphFuture1.get().getJobID(), is(equalTo(nonEmptyJobGraph.getJobID())));
+				}
+			} finally {
+				if (dispatcher1 != null) {
+					RpcUtils.terminateRpcEndpoint(dispatcher1, TIMEOUT);
+				}
+
+				if (dispatcher2 != null) {
+					RpcUtils.terminateRpcEndpoint(dispatcher2, TIMEOUT);
+				}
+
+				if (haServices != null) {
+					haServices.close();
+				}
+			}
+		}
+	}
+
 	@Nonnull
-	private TestingDispatcher createDispatcher(TestingHighAvailabilityServices testingHighAvailabilityServices) throws Exception {
+	private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
-			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName() + UUID.randomUUID(),
 			configuration,
-			testingHighAvailabilityServices,
+			highAvailabilityServices,
 			new TestingResourceManagerGateway(),
 			blobServer,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			jobManagerRunnerFactory,
 			testingFatalErrorHandler);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
index da63b7fe046..906e9d5a663 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
@@ -46,7 +46,7 @@ public void testEntrypointClusterConfigurationParsing() throws FlinkParseExcepti
 		final String value = "value";
 		final String arg1 = "arg1";
 		final String arg2 = "arg2";
-		final String[] args = {"--configDir", configDir, "-r", String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
+		final String[] args = {"--configDir", configDir, "--executionMode", "cluster", "--host", "localhost",  "-r", String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
 
 		final EntrypointClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index 3b9c5786ca4..bf8751547ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -76,7 +76,7 @@ public synchronized SubmittedJobGraph recoverJobGraph(JobID jobId) throws Except
 		verifyIsStarted();
 
 		if (recoverJobGraphFunction != null) {
-			return recoverJobGraphFunction.applyWithException(jobId, storedJobs);
+			return recoverJobGraphFunction.apply(jobId, storedJobs);
 		} else {
 			return requireNonNull(
 				storedJobs.get(jobId),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services