You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:07 UTC

[flink] 04/09: [hotfix] Add BiConsumerWithException#unchecked to convert into BiConsumer

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4942b957dc382a5f7c3c52efd308728b0fafcdeb
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:15:37 2018 +0200

    [hotfix] Add BiConsumerWithException#unchecked to convert into BiConsumer
---
 .../util/function/BiConsumerWithException.java     | 27 ++++++++++++++--------
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  2 +-
 .../runtime/jobmaster/RescalingBehaviour.java      |  4 ++--
 3 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index 5864c8a..6fc5b76 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -30,7 +30,7 @@ import java.util.function.BiConsumer;
  * @param <E> type of the thrown exception
  */
 @FunctionalInterface
-public interface BiConsumerWithException<T, U, E extends Throwable> extends BiConsumer<T, U> {
+public interface BiConsumerWithException<T, U, E extends Throwable> {
 
 	/**
 	 * Performs this operation on the given arguments.
@@ -39,14 +39,23 @@ public interface BiConsumerWithException<T, U, E extends Throwable> extends BiCo
 	 * @param u the second input argument
 	 * @throws E in case of an error
 	 */
-	void acceptWithException(T t, U u) throws E;
+	void accept(T t, U u) throws E;
 
-	@Override
-	default void accept(T t, U u) {
-		try {
-			acceptWithException(t, u);
-		} catch (Throwable e) {
-			ExceptionUtils.rethrow(e);
-		}
+	/**
+	 * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
+	 *
+	 * @param biConsumerWithException BiConsumer with exception to convert into a {@link BiConsumer}.
+	 * @param <A> first input type
+	 * @param <B> second input type
+	 * @return {@link BiConsumer} which rethrows all checked exceptions as unchecked.
+	 */
+	static <A, B> BiConsumer<A, B> unchecked(BiConsumerWithException<A, B, ?> biConsumerWithException) {
+		return (A a, B b) -> {
+			try {
+				biConsumerWithException.accept(a, b);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4a66d32..736984e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1483,7 +1483,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
 			}
 
-			rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+			rescalingBehaviour.accept(jobVertex, newParallelism);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
index 7de9560..64e2ffa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
@@ -29,7 +29,7 @@ public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Int
 	// rescaling is only executed if the operator can be set to the given parallelism
 	STRICT {
 		@Override
-		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
+		public void accept(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
 			if (jobVertex.getMaxParallelism() < newParallelism) {
 				throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() +
 					" because its maximum parallelism " + jobVertex.getMaxParallelism() +
@@ -42,7 +42,7 @@ public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Int
 	// the new parallelism will be the minimum of the given parallelism and the maximum parallelism
 	RELAXED {
 		@Override
-		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) {
+		public void accept(JobVertex jobVertex, Integer newParallelism) {
 			jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism));
 		}
 	}