You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:08 UTC

[flink] 05/09: [hotfix] Add BiFunctionWithException#unchecked to convert into BiFunction

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6feabbcd28adde4dd29754bcee2e72f45d3c9036
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:19:16 2018 +0200

    [hotfix] Add BiFunctionWithException#unchecked to convert into BiFunction
---
 .../util/function/BiFunctionWithException.java     | 31 ++++++++++++++--------
 .../flink/runtime/dispatcher/Dispatcher.java       |  4 +--
 .../testutils/InMemorySubmittedJobGraphStore.java  |  2 +-
 3 files changed, 23 insertions(+), 14 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
index 967c737..ccba8a7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
@@ -31,7 +31,7 @@ import java.util.function.BiFunction;
  * @param <E> type of the exception which can be thrown
  */
 @FunctionalInterface
-public interface BiFunctionWithException<T, U, R, E extends Throwable> extends BiFunction<T, U, R> {
+public interface BiFunctionWithException<T, U, R, E extends Throwable> {
 
 	/**
 	 * Apply the given values t and u to obtain the resulting value. The operation can
@@ -42,16 +42,25 @@ public interface BiFunctionWithException<T, U, R, E extends Throwable> extends B
 	 * @return result value
 	 * @throws E if the operation fails
 	 */
-	R applyWithException(T t, U u) throws E;
+	R apply(T t, U u) throws E;
 
-	default R apply(T t, U u) {
-		try {
-			return applyWithException(t, u);
-		} catch (Throwable e) {
-			ExceptionUtils.rethrow(e);
-			// we have to return a value to please the compiler
-			// but we will never reach the code here
-			return null;
-		}
+	/**
+	 * Convert at {@link BiFunctionWithException} into a {@link BiFunction}.
+	 *
+	 * @param biFunctionWithException function with exception to convert into a function
+	 * @param <A> input type
+	 * @param <B> output type
+	 * @return {@link BiFunction} which throws all checked exception as an unchecked exception.
+	 */
+	static <A, B, C> BiFunction<A, B, C> unchecked(BiFunctionWithException<A, B, C, ?> biFunctionWithException) {
+		return (A a, B b) -> {
+			try {
+				return biFunctionWithException.apply(a, b);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+				// we need this to appease the compiler :-(
+				return null;
+			}
+		};
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 40857aa..4986f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -805,7 +805,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 				final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
 					recoveredJobsFuture,
-					(BiFunctionWithException<Boolean, Collection<JobGraph>, Void, Exception>) (Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
+					BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
 						if (confirmLeadership) {
 							leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
 						} else {
@@ -814,7 +814,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 							}
 						}
 						return null;
-					},
+					}),
 					getRpcService().getExecutor());
 
 				confirmationFuture.whenComplete(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index 3b9c578..bf87515 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -76,7 +76,7 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
 		verifyIsStarted();
 
 		if (recoverJobGraphFunction != null) {
-			return recoverJobGraphFunction.applyWithException(jobId, storedJobs);
+			return recoverJobGraphFunction.apply(jobId, storedJobs);
 		} else {
 			return requireNonNull(
 				storedJobs.get(jobId),