You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:08 UTC
[flink] 05/09: [hotfix] Add BiFunctionWithException#unchecked to
convert into BiFunction
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6feabbcd28adde4dd29754bcee2e72f45d3c9036
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:19:16 2018 +0200
[hotfix] Add BiFunctionWithException#unchecked to convert into BiFunction
---
.../util/function/BiFunctionWithException.java | 31 ++++++++++++++--------
.../flink/runtime/dispatcher/Dispatcher.java | 4 +--
.../testutils/InMemorySubmittedJobGraphStore.java | 2 +-
3 files changed, 23 insertions(+), 14 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
index 967c737..ccba8a7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
@@ -31,7 +31,7 @@ import java.util.function.BiFunction;
* @param <E> type of the exception which can be thrown
*/
@FunctionalInterface
-public interface BiFunctionWithException<T, U, R, E extends Throwable> extends BiFunction<T, U, R> {
+public interface BiFunctionWithException<T, U, R, E extends Throwable> {
/**
* Apply the given values t and u to obtain the resulting value. The operation can
@@ -42,16 +42,25 @@ public interface BiFunctionWithException<T, U, R, E extends Throwable> extends B
* @return result value
* @throws E if the operation fails
*/
- R applyWithException(T t, U u) throws E;
+ R apply(T t, U u) throws E;
- default R apply(T t, U u) {
- try {
- return applyWithException(t, u);
- } catch (Throwable e) {
- ExceptionUtils.rethrow(e);
- // we have to return a value to please the compiler
- // but we will never reach the code here
- return null;
- }
+ /**
+ * Convert at {@link BiFunctionWithException} into a {@link BiFunction}.
+ *
+ * @param biFunctionWithException function with exception to convert into a function
+ * @param <A> input type
+ * @param <B> output type
+ * @return {@link BiFunction} which throws all checked exception as an unchecked exception.
+ */
+ static <A, B, C> BiFunction<A, B, C> unchecked(BiFunctionWithException<A, B, C, ?> biFunctionWithException) {
+ return (A a, B b) -> {
+ try {
+ return biFunctionWithException.apply(a, b);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ // we need this to appease the compiler :-(
+ return null;
+ }
+ };
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 40857aa..4986f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -805,7 +805,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
recoveredJobsFuture,
- (BiFunctionWithException<Boolean, Collection<JobGraph>, Void, Exception>) (Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
+ BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
if (confirmLeadership) {
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
} else {
@@ -814,7 +814,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
}
return null;
- },
+ }),
getRpcService().getExecutor());
confirmationFuture.whenComplete(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index 3b9c578..bf87515 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -76,7 +76,7 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
verifyIsStarted();
if (recoverJobGraphFunction != null) {
- return recoverJobGraphFunction.applyWithException(jobId, storedJobs);
+ return recoverJobGraphFunction.apply(jobId, storedJobs);
} else {
return requireNonNull(
storedJobs.get(jobId),