You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:14:53 UTC

[flink] branch release-1.5 updated (94a7a15 -> fea8959)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 94a7a15  [FLINK-10223][logging] Add 'ResourceID' to log message.
     new 5ee05d9  [hotfix] Add LeaderRetrievalUtils#retrieveLeaderConnectionInfo with Time timeout
     new 4b4ccec  [hotfix] Add FunctionUtils#uncheckedFunction to convert FunctionWithExcpetion into Function
     new 5a97f12  [FLINK-10255] Only react to onAddedJobGraph signal when being leader
     new 775d968  [hotfix] Add BiConsumerWithException#unchecked to convert into BiConsumer
     new 6d01816  [hotfix] Add BiFunctionWithException#unchecked to convert into BiFunction
     new 92bcf53  [hotfix] Add ThrowingRunnable#unchecked and FunctionUtils#uncheckedConsumer
     new 3178213  [hotfix] Add --host config option to ClusterEntrypoint
     new eae90b7  [FLINK-10329] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed
     new fea8959  [FLINK-10328] Release all locks when stopping the ZooKeeperSubmittedJobGraphStore

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../util/function/BiConsumerWithException.java     |  27 ++-
 .../util/function/BiFunctionWithException.java     |  31 ++--
 .../flink/util/function/ConsumerWithException.java |  43 -----
 .../apache/flink/util/function/FunctionUtils.java  |  72 ++++++++
 .../flink/util/function/ThrowingRunnable.java      |  18 ++
 .../ZooKeeperCompletedCheckpointStore.java         |   4 +-
 .../flink/runtime/dispatcher/Dispatcher.java       | 181 +++++++++++++++------
 .../runtime/entrypoint/ClusterConfiguration.java   |  13 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  11 +-
 .../ZooKeeperSubmittedJobGraphStore.java           |  29 +++-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   2 +-
 .../runtime/jobmaster/RescalingBehaviour.java      |   4 +-
 .../flink/runtime/rest/RestServerEndpoint.java     |   1 +
 .../flink/runtime/util/LeaderRetrievalUtils.java   |  18 ++
 .../zookeeper/ZooKeeperStateHandleStore.java       |  11 +-
 .../TestingRetrievableStateStorageHelper.java      |  63 +++++++
 .../ZooKeeperCompletedCheckpointStoreTest.java     |  36 ----
 .../flink/runtime/dispatcher/DispatcherHATest.java |   4 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |  10 +-
 .../runtime/dispatcher/TestingDispatcher.java      |  11 +-
 .../dispatcher/ZooKeeperHADispatcherTest.java      | 160 ++++++++++++++++--
 .../ZooKeeperSubmittedJobGraphStoreTest.java       | 106 ++++++++++++
 .../testutils/InMemorySubmittedJobGraphStore.java  |   2 +-
 23 files changed, 662 insertions(+), 195 deletions(-)
 delete mode 100644 flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
 create mode 100644 flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java


[flink] 01/09: [hotfix] Add LeaderRetrievalUtils#retrieveLeaderConnectionInfo with Time timeout

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ee05d91957a9cb5dfc88169b84cba72cd33acd5
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 10 14:42:32 2018 +0200

    [hotfix] Add LeaderRetrievalUtils#retrieveLeaderConnectionInfo with Time timeout
---
 .../flink/runtime/util/LeaderRetrievalUtils.java       | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 4d78335..9c8f7bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -102,6 +103,23 @@ public class LeaderRetrievalUtils {
 	 */
 	public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
 			LeaderRetrievalService leaderRetrievalService,
+			Time timeout) throws LeaderRetrievalException {
+		return retrieveLeaderConnectionInfo(leaderRetrievalService, FutureUtils.toFiniteDuration(timeout));
+	}
+
+	/**
+	 * Retrieves the leader akka url and the current leader session ID. The values are stored in a
+	 * {@link LeaderConnectionInfo} instance.
+	 *
+	 * @param leaderRetrievalService Leader retrieval service to retrieve the leader connection
+	 *                               information
+	 * @param timeout Timeout when to give up looking for the leader
+	 * @return LeaderConnectionInfo containing the leader's akka URL and the current leader session
+	 * ID
+	 * @throws LeaderRetrievalException
+	 */
+	public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
+			LeaderRetrievalService leaderRetrievalService,
 			FiniteDuration timeout
 	) throws LeaderRetrievalException {
 		LeaderConnectionInfoListener listener = new LeaderConnectionInfoListener();


[flink] 02/09: [hotfix] Add FunctionUtils#uncheckedFunction to convert FunctionWithExcpetion into Function

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b4ccec403c5825517fd7e1960a0f38f1c99080d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Sep 11 14:24:18 2018 +0200

    [hotfix] Add FunctionUtils#uncheckedFunction to convert FunctionWithExcpetion into Function
---
 .../apache/flink/util/function/FunctionUtils.java  | 53 ++++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
new file mode 100644
index 0000000..c15ece1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.function.Function;
+
+/**
+ * Utility class for Flink's functions.
+ */
+public class FunctionUtils {
+
+	private FunctionUtils() {
+		throw new UnsupportedOperationException("This class should never be instantiated.");
+	}
+
+	/**
+	 * Convert at {@link FunctionWithException} into a {@link Function}.
+	 *
+	 * @param functionWithException function with exception to convert into a function
+	 * @param <A> input type
+	 * @param <B> output type
+	 * @return {@link Function} which throws all checked exception as an unchecked exception.
+	 */
+	public static <A, B> Function<A, B> uncheckedFunction(FunctionWithException<A, B, ?> functionWithException) {
+		return (A value) -> {
+			try {
+				return functionWithException.apply(value);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+				// we need this to appease the compiler :-(
+				return null;
+			}
+		};
+	}
+}


[flink] 04/09: [hotfix] Add BiConsumerWithException#unchecked to convert into BiConsumer

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 775d9685959344549f433e6679c54ceacc1a98fd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:15:37 2018 +0200

    [hotfix] Add BiConsumerWithException#unchecked to convert into BiConsumer
---
 .../util/function/BiConsumerWithException.java     | 27 ++++++++++++++--------
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  2 +-
 .../runtime/jobmaster/RescalingBehaviour.java      |  4 ++--
 3 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index 5864c8a..6fc5b76 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -30,7 +30,7 @@ import java.util.function.BiConsumer;
  * @param <E> type of the thrown exception
  */
 @FunctionalInterface
-public interface BiConsumerWithException<T, U, E extends Throwable> extends BiConsumer<T, U> {
+public interface BiConsumerWithException<T, U, E extends Throwable> {
 
 	/**
 	 * Performs this operation on the given arguments.
@@ -39,14 +39,23 @@ public interface BiConsumerWithException<T, U, E extends Throwable> extends BiCo
 	 * @param u the second input argument
 	 * @throws E in case of an error
 	 */
-	void acceptWithException(T t, U u) throws E;
+	void accept(T t, U u) throws E;
 
-	@Override
-	default void accept(T t, U u) {
-		try {
-			acceptWithException(t, u);
-		} catch (Throwable e) {
-			ExceptionUtils.rethrow(e);
-		}
+	/**
+	 * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
+	 *
+	 * @param biConsumerWithException BiConsumer with exception to convert into a {@link BiConsumer}.
+	 * @param <A> first input type
+	 * @param <B> second input type
+	 * @return {@link BiConsumer} which rethrows all checked exceptions as unchecked.
+	 */
+	static <A, B> BiConsumer<A, B> unchecked(BiConsumerWithException<A, B, ?> biConsumerWithException) {
+		return (A a, B b) -> {
+			try {
+				biConsumerWithException.accept(a, b);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 8b34a78..13a06b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1482,7 +1482,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
 			}
 
-			rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+			rescalingBehaviour.accept(jobVertex, newParallelism);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
index 7de9560..64e2ffa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
@@ -29,7 +29,7 @@ public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Int
 	// rescaling is only executed if the operator can be set to the given parallelism
 	STRICT {
 		@Override
-		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
+		public void accept(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
 			if (jobVertex.getMaxParallelism() < newParallelism) {
 				throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() +
 					" because its maximum parallelism " + jobVertex.getMaxParallelism() +
@@ -42,7 +42,7 @@ public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Int
 	// the new parallelism will be the minimum of the given parallelism and the maximum parallelism
 	RELAXED {
 		@Override
-		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) {
+		public void accept(JobVertex jobVertex, Integer newParallelism) {
 			jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism));
 		}
 	}


[flink] 03/09: [FLINK-10255] Only react to onAddedJobGraph signal when being leader

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5a97f12c339ed3c0b6798c9fc0fd17910689099d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 8 15:19:37 2018 +0200

    [FLINK-10255] Only react to onAddedJobGraph signal when being leader
    
    The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
    In all other cases the signal should be ignored since the jobs will be recovered once
    the Dispatcher becomes the leader.
    
    In order to still support non-blocking job recoveries, this commit serializes all
    recovery operations by introducing a recoveryOperation future which first needs to
    complete before a subsequent operation is started. That way we can avoid race conditions
    between granting and revoking leadership as well as the onAddedJobGraph signals. This is
    important since we can only lock each JobGraph once and, thus, need to make sure that
    we don't release a lock of a properly recovered job in a concurrent operation.
    
    This closes #6678.
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 172 +++++++++++++++------
 .../zookeeper/ZooKeeperStateHandleStore.java       |  11 +-
 .../flink/runtime/dispatcher/DispatcherHATest.java |   4 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |  10 +-
 .../runtime/dispatcher/TestingDispatcher.java      |  11 +-
 .../dispatcher/ZooKeeperHADispatcherTest.java      | 160 +++++++++++++++++--
 6 files changed, 290 insertions(+), 78 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c31e64c..40857aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -60,11 +60,12 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPre
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiFunctionWithException;
 import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.FunctionUtils;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -128,6 +129,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
 
+	private CompletableFuture<Void> recoveryOperation = CompletableFuture.completedFuture(null);
+
 	public Dispatcher(
 			RpcService rpcService,
 			String endpointId,
@@ -629,31 +632,51 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	 * Recovers all jobs persisted via the submitted job graph store.
 	 */
 	@VisibleForTesting
-	CompletableFuture<Collection<JobGraph>> recoverJobs() {
+	Collection<JobGraph> recoverJobs() throws Exception {
 		log.info("Recovering all persisted jobs.");
-		return FutureUtils.supplyAsync(
-			() -> {
-				final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
-
-				final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
+		final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
 
-				for (JobID jobId : jobIds) {
-					jobGraphs.add(recoverJob(jobId));
+		try {
+			return recoverJobGraphs(jobIds);
+		} catch (Exception e) {
+			// release all recovered job graphs
+			for (JobID jobId : jobIds) {
+				try {
+					submittedJobGraphStore.releaseJobGraph(jobId);
+				} catch (Exception ie) {
+					e.addSuppressed(ie);
 				}
+			}
+			throw e;
+		}
+	}
 
-				return jobGraphs;
-			},
-			getRpcService().getExecutor());
+	@Nonnull
+	private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) throws Exception {
+		final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
+
+		for (JobID jobId : jobIds) {
+			final JobGraph jobGraph = recoverJob(jobId);
+
+			if (jobGraph == null) {
+				throw new FlinkJobNotFoundException(jobId);
+			}
+
+			jobGraphs.add(jobGraph);
+		}
+
+		return jobGraphs;
 	}
 
+	@Nullable
 	private JobGraph recoverJob(JobID jobId) throws Exception {
 		log.debug("Recover job {}.", jobId);
-		SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
+		final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
 
 		if (submittedJobGraph != null) {
 			return submittedJobGraph.getJobGraph();
 		} else {
-			throw new FlinkJobNotFoundException(jobId);
+			return null;
 		}
 	}
 
@@ -768,27 +791,40 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
+		runAsyncWithoutFencing(
+			() -> {
+				log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
 
-		final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
+				final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoveryOperation.thenApplyAsync(
+					FunctionUtils.uncheckedFunction(ignored -> recoverJobs()),
+					getRpcService().getExecutor());
 
-		final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
-			(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
-			getUnfencedMainThreadExecutor());
+				final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
+					(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
+					getUnfencedMainThreadExecutor());
 
-		final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenAcceptAsync(
-			(Boolean confirmLeadership) -> {
-				if (confirmLeadership) {
-					leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
-				}
-			},
-			getRpcService().getExecutor());
+				final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
+					recoveredJobsFuture,
+					(BiFunctionWithException<Boolean, Collection<JobGraph>, Void, Exception>) (Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
+						if (confirmLeadership) {
+							leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+						} else {
+							for (JobGraph recoveredJob : recoveredJobs) {
+								submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
+							}
+						}
+						return null;
+					},
+					getRpcService().getExecutor());
+
+				confirmationFuture.whenComplete(
+					(Void ignored, Throwable throwable) -> {
+						if (throwable != null) {
+							onFatalError(ExceptionUtils.stripCompletionException(throwable));
+						}
+					});
 
-		confirmationFuture.whenComplete(
-			(Void ignored, Throwable throwable) -> {
-				if (throwable != null) {
-					onFatalError(ExceptionUtils.stripCompletionException(throwable));
-				}
+				recoveryOperation = confirmationFuture;
 			});
 	}
 
@@ -829,7 +865,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			getMainThreadExecutor());
 	}
 
-	protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+	CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
 		if (jobManagerRunners.containsKey(jobId)) {
 			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
 		} else {
@@ -837,6 +873,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 	}
 
+	@VisibleForTesting
+	CompletableFuture<Void> getRecoveryOperation() {
+		return recoveryOperation;
+	}
+
 	private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
 		// clear the state if we've been the leader before
 		if (getFencingToken() != null) {
@@ -879,24 +920,63 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public void onAddedJobGraph(final JobID jobId) {
-		final CompletableFuture<SubmittedJobGraph> recoveredJob = getRpcService().execute(
-			() -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-		final CompletableFuture<Acknowledge> submissionFuture = recoveredJob.thenComposeAsync(
-			(SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-			getMainThreadExecutor());
-
-		submissionFuture.whenComplete(
-			(Acknowledge acknowledge, Throwable throwable) -> {
-				if (throwable != null) {
-					onFatalError(
-						new DispatcherException(
-							String.format("Could not start the added job %s", jobId),
-							ExceptionUtils.stripCompletionException(throwable)));
+		runAsync(
+			() -> {
+				if (!jobManagerRunners.containsKey(jobId)) {
+					// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
+					// the specified job is already removed from the SubmittedJobGraphStore. In this case,
+					// SubmittedJobGraphStore.recoverJob returns null.
+					final CompletableFuture<Optional<JobGraph>> recoveredJob = recoveryOperation.thenApplyAsync(
+						FunctionUtils.uncheckedFunction(ignored -> Optional.ofNullable(recoverJob(jobId))),
+						getRpcService().getExecutor());
+
+					final DispatcherId dispatcherId = getFencingToken();
+					final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
+						(Optional<JobGraph> jobGraphOptional) -> jobGraphOptional.map(
+							FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
+								(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
+										if (!isRecoveredJobRunning) {
+											submittedJobGraphStore.releaseJobGraph(jobId);
+										}
+									},
+									getRpcService().getExecutor())))
+							.orElse(CompletableFuture.completedFuture(null)),
+						getUnfencedMainThreadExecutor());
+
+					submissionFuture.whenComplete(
+						(Void ignored, Throwable throwable) -> {
+							if (throwable != null) {
+								onFatalError(
+									new DispatcherException(
+										String.format("Could not start the added job %s", jobId),
+										ExceptionUtils.stripCompletionException(throwable)));
+							}
+						});
+
+					recoveryOperation = submissionFuture;
 				}
 			});
 	}
 
+	private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
+		if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
+			final JobID jobId = jobGraph.getJobID();
+			if (jobManagerRunners.containsKey(jobId)) {
+				// we must not release the job graph lock since it can only be locked once and
+				// is currently being executed. Once we support multiple locks, we must release
+				// the JobGraph here
+				log.debug("Ignore added JobGraph because the job {} is already running.", jobId);
+				return CompletableFuture.completedFuture(true);
+			} else if (runningJobsRegistry.getJobSchedulingStatus(jobId) != RunningJobsRegistry.JobSchedulingStatus.DONE) {
+				return waitForTerminatingJobManager(jobId, jobGraph, this::runJob).thenApply(ignored -> true);
+			} else {
+				log.debug("Ignore added JobGraph because the job {} has already been completed.", jobId);
+			}
+		}
+
+		return CompletableFuture.completedFuture(false);
+	}
+
 	@Override
 	public void onRemovedJobGraph(final JobID jobId) {
 		runAsync(() -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index f266e36..4c4e5c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -526,22 +526,13 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 				client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
 			} catch (KeeperException.NodeExistsException ignored) {
 				// we have already created the lock
-			} catch (KeeperException.NoNodeException e) {
-				throw new Exception("Cannot lock the node " + path + " since it does not exist.", e);
 			}
 		}
 
 		boolean success = false;
 
 		try {
-			byte[] data;
-
-			try {
-				data = client.getData().forPath(path);
-			} catch (Exception e) {
-				throw new Exception("Failed to retrieve state handle data under " + path +
-					" from ZooKeeper.", e);
-			}
+			byte[] data = client.getData().forPath(path);
 
 			try {
 				RetrievableStateHandle<T> retrievableStateHandle = InstantiationUtil.deserializeObject(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index cb26f48..335199a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -302,9 +302,7 @@ public class DispatcherHATest extends TestLogger {
 		}
 
 		@Override
-		public void releaseJobGraph(JobID jobId) throws Exception {
-			throw new UnsupportedOperationException("Should not be called.");
-		}
+		public void releaseJobGraph(JobID jobId) throws Exception {}
 
 		@Override
 		public Collection<JobID> getJobIds() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d405fcd..1af10b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -319,13 +319,13 @@ public class DispatcherTest extends TestLogger {
 		runningJobsRegistry.setJobFinished(TEST_JOB_ID);
 		dispatcher.onAddedJobGraph(TEST_JOB_ID);
 
-		final CompletableFuture<Throwable> errorFuture = fatalErrorHandler.getErrorFuture();
-
-		final Throwable throwable = errorFuture.get();
+		// wait until the recovery is over
+		dispatcher.getRecoverOperationFuture(TIMEOUT).get();
 
-		assertThat(throwable, instanceOf(DispatcherException.class));
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-		fatalErrorHandler.clearError();
+		// check that we did not start executing the added JobGraph
+		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), is(empty()));
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 5141be0..6a62376 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -73,16 +72,20 @@ class TestingDispatcher extends Dispatcher {
 			VoidHistoryServerArchivist.INSTANCE);
 	}
 
-	@VisibleForTesting
 	void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
 		runAsync(
 			() -> jobReachedGloballyTerminalState(archivedExecutionGraph));
 	}
 
-	@VisibleForTesting
-	public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
+	CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) {
 		return callAsyncWithoutFencing(
 			() -> getJobTerminationFuture(jobId),
 			timeout).thenCompose(Function.identity());
 	}
+
+	CompletableFuture<Void> getRecoverOperationFuture(@Nonnull Time timeout) {
+		return callAsyncWithoutFencing(
+			this::getRecoveryOperation,
+			timeout).thenCompose(Function.identity());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index dd03758..b5662c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -24,17 +24,25 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
@@ -56,8 +64,13 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -67,8 +80,8 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
-	@ClassRule
-	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
+	@Rule
+	public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@@ -87,14 +100,14 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 	@BeforeClass
 	public static void setupClass() throws IOException {
 		configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
 		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
 		rpcService = new TestingRpcService();
 		blobServer = new BlobServer(configuration, new VoidBlobStore());
 	}
 
 	@Before
-	public void setup() {
+	public void setup() throws Exception {
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
 		testingFatalErrorHandler = new TestingFatalErrorHandler();
 	}
 
@@ -139,7 +152,9 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
 			testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
 
-			final TestingDispatcher dispatcher = createDispatcher(testingHighAvailabilityServices);
+			final TestingDispatcher dispatcher = createDispatcher(
+				testingHighAvailabilityServices,
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
 
 			dispatcher.start();
 
@@ -167,7 +182,7 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 				// recover the job
 				final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId);
 
-				assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue()));
+				assertThat(submittedJobGraph, is(notNullValue()));
 
 				// check that the other submitted job graph store can remove the job graph after the original leader
 				// has lost its leadership
@@ -184,20 +199,145 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that a standby Dispatcher does not interfere with the clean up of a completed
+	 * job.
+	 */
+	@Test
+	public void testStandbyDispatcherJobExecution() throws Exception {
+		try (final TestingHighAvailabilityServices haServices1 = new TestingHighAvailabilityServices();
+			final TestingHighAvailabilityServices haServices2 = new TestingHighAvailabilityServices();
+			final CuratorFramework curatorFramework = ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore1 = ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+			haServices1.setSubmittedJobGraphStore(submittedJobGraphStore1);
+			final TestingLeaderElectionService leaderElectionService1 = new TestingLeaderElectionService();
+			haServices1.setDispatcherLeaderElectionService(leaderElectionService1);
+
+			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore2 = ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+			haServices2.setSubmittedJobGraphStore(submittedJobGraphStore2);
+			final TestingLeaderElectionService leaderElectionService2 = new TestingLeaderElectionService();
+			haServices2.setDispatcherLeaderElectionService(leaderElectionService2);
+
+			final CompletableFuture<JobGraph> jobGraphFuture = new CompletableFuture<>();
+			final CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
+			final TestingDispatcher dispatcher1 = createDispatcher(
+				haServices1,
+				new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+
+			final TestingDispatcher dispatcher2 = createDispatcher(
+				haServices2,
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+
+			try {
+				dispatcher1.start();
+				dispatcher2.start();
+
+				leaderElectionService1.isLeader(UUID.randomUUID()).get();
+				final DispatcherGateway dispatcherGateway1 = dispatcher1.getSelfGateway(DispatcherGateway.class);
+
+				final JobGraph jobGraph = DispatcherHATest.createNonEmptyJobGraph();
+
+				dispatcherGateway1.submitJob(jobGraph, TIMEOUT).get();
+
+				final CompletableFuture<JobResult> jobResultFuture = dispatcherGateway1.requestJobResult(jobGraph.getJobID(), TIMEOUT);
+
+				jobGraphFuture.get();
+
+				// complete the job
+				resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+
+				final JobResult jobResult = jobResultFuture.get();
+
+				assertThat(jobResult.isSuccess(), is(true));
+
+				// wait for the completion of the job
+				dispatcher1.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+				// change leadership
+				leaderElectionService1.notLeader();
+				leaderElectionService2.isLeader(UUID.randomUUID()).get();
+
+				// Dispatcher 2 should not recover any jobs
+				final DispatcherGateway dispatcherGateway2 = dispatcher2.getSelfGateway(DispatcherGateway.class);
+				assertThat(dispatcherGateway2.listJobs(TIMEOUT).get(), is(empty()));
+			} finally {
+				RpcUtils.terminateRpcEndpoint(dispatcher1, TIMEOUT);
+				RpcUtils.terminateRpcEndpoint(dispatcher2, TIMEOUT);
+			}
+		}
+	}
+
+	/**
+	 * Tests that a standby {@link Dispatcher} can recover all submitted jobs.
+	 */
+	@Test
+	public void testStandbyDispatcherJobRecovery() throws Exception {
+		try (CuratorFramework curatorFramework = ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+			HighAvailabilityServices haServices = null;
+			Dispatcher dispatcher1 = null;
+			Dispatcher dispatcher2 = null;
+
+			try {
+				haServices = new ZooKeeperHaServices(curatorFramework, rpcService.getExecutor(), configuration, new VoidBlobStore());
+
+				final CompletableFuture<JobGraph> jobGraphFuture1 = new CompletableFuture<>();
+				dispatcher1 = createDispatcher(
+					haServices,
+					new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+				final CompletableFuture<JobGraph> jobGraphFuture2 = new CompletableFuture<>();
+				dispatcher2 = createDispatcher(
+					haServices,
+					new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+
+				dispatcher1.start();
+				dispatcher2.start();
+
+				final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), TIMEOUT);
+
+				final DispatcherGateway dispatcherGateway = rpcService.connect(leaderConnectionInfo.getAddress(), DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()), DispatcherGateway.class).get();
+
+				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
+				dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT).get();
+
+				if (dispatcher1.getAddress().equals(leaderConnectionInfo.getAddress())) {
+					dispatcher1.shutDown();
+					assertThat(jobGraphFuture2.get().getJobID(), is(equalTo(nonEmptyJobGraph.getJobID())));
+				} else {
+					dispatcher2.shutDown();
+					assertThat(jobGraphFuture1.get().getJobID(), is(equalTo(nonEmptyJobGraph.getJobID())));
+				}
+			} finally {
+				if (dispatcher1 != null) {
+					RpcUtils.terminateRpcEndpoint(dispatcher1, TIMEOUT);
+				}
+
+				if (dispatcher2 != null) {
+					RpcUtils.terminateRpcEndpoint(dispatcher2, TIMEOUT);
+				}
+
+				if (haServices != null) {
+					haServices.close();
+				}
+			}
+		}
+	}
+
 	@Nonnull
-	private TestingDispatcher createDispatcher(TestingHighAvailabilityServices testingHighAvailabilityServices) throws Exception {
+	private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
-			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName() + UUID.randomUUID(),
 			configuration,
-			testingHighAvailabilityServices,
+			highAvailabilityServices,
 			new TestingResourceManagerGateway(),
 			blobServer,
 			new HeartbeatServices(1000L, 1000L),
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			jobManagerRunnerFactory,
 			testingFatalErrorHandler);
 	}
 }


[flink] 05/09: [hotfix] Add BiFunctionWithException#unchecked to convert into BiFunction

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d01816e24a40b507d12607cedf7f40c6c5fa673
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:19:16 2018 +0200

    [hotfix] Add BiFunctionWithException#unchecked to convert into BiFunction
---
 .../util/function/BiFunctionWithException.java     | 31 ++++++++++++++--------
 .../flink/runtime/dispatcher/Dispatcher.java       |  4 +--
 .../testutils/InMemorySubmittedJobGraphStore.java  |  2 +-
 3 files changed, 23 insertions(+), 14 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
index 967c737..ccba8a7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
@@ -31,7 +31,7 @@ import java.util.function.BiFunction;
  * @param <E> type of the exception which can be thrown
  */
 @FunctionalInterface
-public interface BiFunctionWithException<T, U, R, E extends Throwable> extends BiFunction<T, U, R> {
+public interface BiFunctionWithException<T, U, R, E extends Throwable> {
 
 	/**
 	 * Apply the given values t and u to obtain the resulting value. The operation can
@@ -42,16 +42,25 @@ public interface BiFunctionWithException<T, U, R, E extends Throwable> extends B
 	 * @return result value
 	 * @throws E if the operation fails
 	 */
-	R applyWithException(T t, U u) throws E;
+	R apply(T t, U u) throws E;
 
-	default R apply(T t, U u) {
-		try {
-			return applyWithException(t, u);
-		} catch (Throwable e) {
-			ExceptionUtils.rethrow(e);
-			// we have to return a value to please the compiler
-			// but we will never reach the code here
-			return null;
-		}
+	/**
+	 * Convert at {@link BiFunctionWithException} into a {@link BiFunction}.
+	 *
+	 * @param biFunctionWithException function with exception to convert into a function
+	 * @param <A> input type
+	 * @param <B> output type
+	 * @return {@link BiFunction} which throws all checked exception as an unchecked exception.
+	 */
+	static <A, B, C> BiFunction<A, B, C> unchecked(BiFunctionWithException<A, B, C, ?> biFunctionWithException) {
+		return (A a, B b) -> {
+			try {
+				return biFunctionWithException.apply(a, b);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+				// we need this to appease the compiler :-(
+				return null;
+			}
+		};
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 40857aa..4986f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -805,7 +805,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 				final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
 					recoveredJobsFuture,
-					(BiFunctionWithException<Boolean, Collection<JobGraph>, Void, Exception>) (Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
+					BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
 						if (confirmLeadership) {
 							leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
 						} else {
@@ -814,7 +814,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 							}
 						}
 						return null;
-					},
+					}),
 					getRpcService().getExecutor());
 
 				confirmationFuture.whenComplete(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index 3b9c578..bf87515 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -76,7 +76,7 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
 		verifyIsStarted();
 
 		if (recoverJobGraphFunction != null) {
-			return recoverJobGraphFunction.applyWithException(jobId, storedJobs);
+			return recoverJobGraphFunction.apply(jobId, storedJobs);
 		} else {
 			return requireNonNull(
 				storedJobs.get(jobId),


[flink] 08/09: [FLINK-10329] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eae90b7869fea1d15a49cd7b4743ff87db40827a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 12 17:45:36 2018 +0200

    [FLINK-10329] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed
    
    Fail properly with an exception if we cannot remove the JobGraph in ZooKeeperSubmittedJobGraphStore#
    removeJobGraph. This is necessary in order to notify callers about the unsuccessful attempt.
---
 .../ZooKeeperSubmittedJobGraphStore.java           |  8 +-
 .../TestingRetrievableStateStorageHelper.java      | 63 ++++++++++++++
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 36 --------
 .../ZooKeeperSubmittedJobGraphStoreTest.java       | 96 ++++++++++++++++++++++
 4 files changed, 164 insertions(+), 39 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 2b935af..343b22a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -264,9 +264,11 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 		synchronized (cacheLock) {
 			if (addedJobGraphs.contains(jobId)) {
-				jobGraphsInZooKeeper.releaseAndTryRemove(path);
-
-				addedJobGraphs.remove(jobId);
+				if (jobGraphsInZooKeeper.releaseAndTryRemove(path)) {
+					addedJobGraphs.remove(jobId);
+				} else {
+					throw new FlinkException(String.format("Could not remove job graph with job id %s from ZooKeeper.", jobId));
+				}
 			}
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
new file mode 100644
index 0000000..92bf1af
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+
+import java.io.Serializable;
+
+/**
+ * {@link RetrievableStateStorageHelper} implementation for testing purposes.
+ *
+ * @param <T> type of the element to store
+ */
+public final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
+
+	@Override
+	public RetrievableStateHandle<T> store(T state) {
+		return new TestingRetrievableStateHandle<>(state);
+	}
+
+	private static final class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
+
+		private static final long serialVersionUID = 137053380713794300L;
+
+		private final T state;
+
+		private TestingRetrievableStateHandle(T state) {
+			this.state = state;
+		}
+
+		@Override
+		public T retrieveState() {
+			return state;
+		}
+
+		@Override
+		public void discardState() {
+			// no op
+		}
+
+		@Override
+		public long getStateSize() {
+			return 0;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index f992d3b..a9cba88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -22,10 +22,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
@@ -36,8 +34,6 @@ import org.junit.Test;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -129,36 +125,4 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			Executors.directExecutor());
 	}
 
-	private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
-		@Override
-		public RetrievableStateHandle<T> store(T state) {
-			return new TestingRetrievableStateHandle<>(state);
-		}
-
-		private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
-
-			private static final long serialVersionUID = 137053380713794300L;
-
-			private final T state;
-
-			private TestingRetrievableStateHandle(T state) {
-				this.state = state;
-			}
-
-			@Override
-			public T retrieveState() throws IOException, ClassNotFoundException {
-				return state;
-			}
-
-			@Override
-			public void discardState() throws Exception {
-				// no op
-			}
-
-			@Override
-			public long getStateSize() {
-				return 0;
-			}
-		}
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
new file mode 100644
index 0000000..dde3b7a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link ZooKeeperSubmittedJobGraphStore}.
+ */
+public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
+
+	@Rule
+	public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
+	private Configuration configuration;
+
+	@Before
+	public void setup() {
+		configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+	}
+
+	/**
+	 * Tests that we fail with an exception if the job cannot be removed from the
+	 * ZooKeeperSubmittedJobGraphStore.
+	 */
+	@Test
+	public void testJobGraphRemovalFailure() throws Exception {
+		try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) {
+			final TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = new TestingRetrievableStateStorageHelper<>();
+			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
+			submittedJobGraphStore.start(null);
+			final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
+			otherSubmittedJobGraphStore.start(null);
+
+			final SubmittedJobGraph jobGraph = new SubmittedJobGraph(new JobGraph(), null);
+			submittedJobGraphStore.putJobGraph(jobGraph);
+
+			final SubmittedJobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobId());
+
+			assertThat(recoveredJobGraph, is(notNullValue()));
+
+			try {
+				otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId());
+				fail("It should not be possible to remove the JobGraph since the first store still has a lock on it.");
+			} catch (Exception ignored) {
+				// expected
+			}
+
+			otherSubmittedJobGraphStore.stop();
+		}
+	}
+
+	@Nonnull
+	public ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphStore(CuratorFramework client, TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
+		return new ZooKeeperSubmittedJobGraphStore(
+			client,
+			"/foobar",
+			stateStorage);
+	}
+
+}


[flink] 06/09: [hotfix] Add ThrowingRunnable#unchecked and FunctionUtils#uncheckedConsumer

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92bcf53a692d805fe7b13bf735dc952a8d33eec7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:26:35 2018 +0200

    [hotfix] Add ThrowingRunnable#unchecked and FunctionUtils#uncheckedConsumer
    
    ThrowingRunnable#unchecked converts a ThrowingRunnable into a Runnable which throws checked
    exceptions as unchecked ones. FunctionUtils#uncheckedConsmer(ThrowingConsumer) converts a
    ThrowingConsumer into a Consumer which throws checked exceptions as unchecked ones. This is
    necessary because ThrowingConsumer is public and we cannot add new methods to the interface.
---
 .../flink/util/function/ConsumerWithException.java | 43 ----------------------
 .../apache/flink/util/function/FunctionUtils.java  | 19 ++++++++++
 .../flink/util/function/ThrowingRunnable.java      | 18 +++++++++
 .../ZooKeeperCompletedCheckpointStore.java         |  4 +-
 .../flink/runtime/dispatcher/Dispatcher.java       | 13 ++++---
 5 files changed, 46 insertions(+), 51 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
deleted file mode 100644
index 09507d4..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util.function;
-
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.function.Consumer;
-
-/**
- * A checked extension of the {@link Consumer} interface.
- *
- * @param <T> type of the first argument
- * @param <E> type of the thrown exception
- */
-public interface ConsumerWithException<T, E extends Throwable> extends Consumer<T> {
-
-	void acceptWithException(T value) throws E;
-
-	@Override
-	default void accept(T value) {
-		try {
-			acceptWithException(value);
-		} catch (Throwable t) {
-			ExceptionUtils.rethrow(t);
-		}
-	}
-}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
index c15ece1..678ef9f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.util.function;
 
 import org.apache.flink.util.ExceptionUtils;
 
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -50,4 +51,22 @@ public class FunctionUtils {
 			}
 		};
 	}
+
+	/**
+	 * Converts a {@link ThrowingConsumer} into a {@link Consumer} which throws checked exceptions
+	 * as unchecked.
+	 *
+	 * @param throwingConsumer to convert into a {@link Consumer}
+	 * @param <A> input type
+	 * @return {@link Consumer} which throws all checked exceptions as unchecked
+	 */
+	public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?> throwingConsumer) {
+		return (A value) -> {
+			try {
+				throwingConsumer.accept(value);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
index 4fef420..0dd4047 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.util.function;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
 
 /**
  * Similar to a {@link Runnable}, this interface is used to capture a block of code
@@ -35,4 +36,21 @@ public interface ThrowingRunnable<E extends Throwable> {
 	 * @throws E Exceptions may be thrown.
 	 */
 	void run() throws E;
+
+	/**
+	 * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions
+	 * as unchecked.
+	 *
+	 * @param throwingRunnable to convert into a {@link Runnable}
+	 * @return {@link Runnable} which throws all checked exceptions as unchecked.
+	 */
+	static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
+		return () -> {
+			try {
+				throwingRunnable.run();
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 1317339..e443fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
@@ -246,7 +246,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		LOG.debug("Added {} to {}.", checkpoint, path);
 	}
 
-	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
+	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
 		try {
 			if (tryRemove(completedCheckpoint.getCheckpointID())) {
 				executor.execute(() -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4986f1d..5279e50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -64,8 +64,9 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiFunctionWithException;
-import org.apache.flink.util.function.ConsumerWithException;
 import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -849,7 +850,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 	}
 
-	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
+	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
 		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
@@ -858,10 +859,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 						throwable)); });
 
 		return jobManagerTerminationFuture.thenRunAsync(
-			() -> {
+			ThrowingRunnable.unchecked(() -> {
 				jobManagerTerminationFutures.remove(jobId);
 				action.accept(jobGraph);
-			},
+			}),
 			getMainThreadExecutor());
 	}
 
@@ -934,11 +935,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 					final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
 						(Optional<JobGraph> jobGraphOptional) -> jobGraphOptional.map(
 							FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
-								(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
+								FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
 										if (!isRecoveredJobRunning) {
 											submittedJobGraphStore.releaseJobGraph(jobId);
 										}
-									},
+									}),
 									getRpcService().getExecutor())))
 							.orElse(CompletableFuture.completedFuture(null)),
 						getUnfencedMainThreadExecutor());


[flink] 07/09: [hotfix] Add --host config option to ClusterEntrypoint

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31782136ea2876518519d1fbe8a56dbd07aac328
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 20:41:32 2018 +0200

    [hotfix] Add --host config option to ClusterEntrypoint
    
    This is necessary to support the command line syntax used by the multi master
    standalone start-up scripts.
---
 .../flink/runtime/entrypoint/ClusterConfiguration.java      | 13 ++++++++++++-
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java  | 11 ++++++++++-
 .../org/apache/flink/runtime/rest/RestServerEndpoint.java   |  1 +
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
index 7f8b509..d53647d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 /**
  * Configuration class which contains the parsed command line arguments for
  * the {@link ClusterEntrypoint}.
@@ -29,8 +31,12 @@ public class ClusterConfiguration {
 
 	private final int restPort;
 
-	public ClusterConfiguration(String configDir, int restPort) {
+	@Nullable
+	private final String hostname;
+
+	public ClusterConfiguration(String configDir, @Nullable String hostname, int restPort) {
 		this.configDir = Preconditions.checkNotNull(configDir);
+		this.hostname = hostname;
 		this.restPort = restPort;
 	}
 
@@ -38,6 +44,11 @@ public class ClusterConfiguration {
 		return configDir;
 	}
 
+	@Nullable
+	public String getHostname() {
+		return hostname;
+	}
+
 	public int getRestPort() {
 		return restPort;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index a267abb..c32be81 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -702,7 +702,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			restPort = -1;
 		}
 
-		return new ClusterConfiguration(configDir, restPort);
+		final String hostKey = "host";
+		final String hostname = parameterTool.get(hostKey);
+
+		return new ClusterConfiguration(configDir, hostname, restPort);
 	}
 
 	protected static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
@@ -714,6 +717,12 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			configuration.setInteger(RestOptions.PORT, restPort);
 		}
 
+		final String hostname = clusterConfiguration.getHostname();
+
+		if (hostname != null) {
+			configuration.setString(JobManagerOptions.ADDRESS, hostname);
+		}
+
 		return configuration;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 01d1043..3e85271 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -180,6 +180,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 				.channel(NioServerSocketChannel.class)
 				.childHandler(initializer);
 
+			log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
 			final ChannelFuture channel;
 			if (restBindAddress == null) {
 				channel = bootstrap.bind(restBindPort);


[flink] 09/09: [FLINK-10328] Release all locks when stopping the ZooKeeperSubmittedJobGraphStore

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fea8959ce1aeba11a2ecf978f3d19447de847cfd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 12 17:47:59 2018 +0200

    [FLINK-10328] Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
    
    When stopping the ZooKeeperSubmittedJobGraphStore, it will release all currently held
    locks such that other instances can remove entries from the store. This is necessary
    if we don't immediately close the used CuratorFramework/ZooKeeper client.
    
    This closes #6686.
---
 .../jobmanager/ZooKeeperSubmittedJobGraphStore.java | 21 ++++++++++++++++++---
 .../ZooKeeperSubmittedJobGraphStoreTest.java        | 12 +++++++++++-
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 343b22a..2fd19fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -146,9 +147,23 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				jobGraphListener = null;
 
 				try {
-					pathCache.close();
-				} catch (Exception e) {
-					throw new Exception("Could not properly stop the ZooKeeperSubmittedJobGraphStore.", e);
+					Exception exception = null;
+
+					try {
+						jobGraphsInZooKeeper.releaseAll();
+					} catch (Exception e) {
+						exception = e;
+					}
+
+					try {
+						pathCache.close();
+					} catch (Exception e) {
+						exception = ExceptionUtils.firstOrSuppressed(e, exception);
+					}
+
+					if (exception != null) {
+						throw new FlinkException("Could not properly stop the ZooKeeperSubmittedJobGraphStore.", exception);
+					}
 				} finally {
 					isRunning = false;
 				}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
index dde3b7a..fae8459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -35,6 +35,7 @@ import javax.annotation.Nonnull;
 
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -57,9 +58,11 @@ public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
 	/**
 	 * Tests that we fail with an exception if the job cannot be removed from the
 	 * ZooKeeperSubmittedJobGraphStore.
+	 *
+	 * <p>Tests that a close ZooKeeperSubmittedJobGraphStore no longer holds any locks.
 	 */
 	@Test
-	public void testJobGraphRemovalFailure() throws Exception {
+	public void testJobGraphRemovalFailureAndLockRelease() throws Exception {
 		try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) {
 			final TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = new TestingRetrievableStateStorageHelper<>();
 			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
@@ -81,6 +84,13 @@ public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
 				// expected
 			}
 
+			submittedJobGraphStore.stop();
+
+			// now we should be able to delete the job graph
+			otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId());
+
+			assertThat(otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobId()), is(nullValue()));
+
 			otherSubmittedJobGraphStore.stop();
 		}
 	}