You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:07 UTC
[flink] 04/09: [hotfix] Add BiConsumerWithException#unchecked to
convert into BiConsumer
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4942b957dc382a5f7c3c52efd308728b0fafcdeb
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:15:37 2018 +0200
[hotfix] Add BiConsumerWithException#unchecked to convert into BiConsumer
---
.../util/function/BiConsumerWithException.java | 27 ++++++++++++++--------
.../apache/flink/runtime/jobmaster/JobMaster.java | 2 +-
.../runtime/jobmaster/RescalingBehaviour.java | 4 ++--
3 files changed, 21 insertions(+), 12 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index 5864c8a..6fc5b76 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -30,7 +30,7 @@ import java.util.function.BiConsumer;
* @param <E> type of the thrown exception
*/
@FunctionalInterface
-public interface BiConsumerWithException<T, U, E extends Throwable> extends BiConsumer<T, U> {
+public interface BiConsumerWithException<T, U, E extends Throwable> {
/**
* Performs this operation on the given arguments.
@@ -39,14 +39,23 @@ public interface BiConsumerWithException<T, U, E extends Throwable> extends BiCo
* @param u the second input argument
* @throws E in case of an error
*/
- void acceptWithException(T t, U u) throws E;
+ void accept(T t, U u) throws E;
- @Override
- default void accept(T t, U u) {
- try {
- acceptWithException(t, u);
- } catch (Throwable e) {
- ExceptionUtils.rethrow(e);
- }
+ /**
+ * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
+ *
+ * @param biConsumerWithException BiConsumer with exception to convert into a {@link BiConsumer}.
+ * @param <A> first input type
+ * @param <B> second input type
+ * @return {@link BiConsumer} which rethrows all checked exceptions as unchecked.
+ */
+ static <A, B> BiConsumer<A, B> unchecked(BiConsumerWithException<A, B, ?> biConsumerWithException) {
+ return (A a, B b) -> {
+ try {
+ biConsumerWithException.accept(a, b);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ };
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4a66d32..736984e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1483,7 +1483,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
}
- rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+ rescalingBehaviour.accept(jobVertex, newParallelism);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
index 7de9560..64e2ffa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
@@ -29,7 +29,7 @@ public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Int
// rescaling is only executed if the operator can be set to the given parallelism
STRICT {
@Override
- public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
+ public void accept(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
if (jobVertex.getMaxParallelism() < newParallelism) {
throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() +
" because its maximum parallelism " + jobVertex.getMaxParallelism() +
@@ -42,7 +42,7 @@ public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Int
// the new parallelism will be the minimum of the given parallelism and the maximum parallelism
RELAXED {
@Override
- public void acceptWithException(JobVertex jobVertex, Integer newParallelism) {
+ public void accept(JobVertex jobVertex, Integer newParallelism) {
jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism));
}
}