You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:09 UTC

[flink] 06/09: [hotfix] Add ThrowingRunnable#unchecked and FunctionUtils#uncheckedConsumer

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7be3956d3e0ccd96d0f777f0fcf97080a1d63112
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:26:35 2018 +0200

    [hotfix] Add ThrowingRunnable#unchecked and FunctionUtils#uncheckedConsumer
    
    ThrowingRunnable#unchecked converts a ThrowingRunnable into a Runnable which throws checked
    exceptions as unchecked ones. FunctionUtils#uncheckedConsmer(ThrowingConsumer) converts a
    ThrowingConsumer into a Consumer which throws checked exceptions as unchecked ones. This is
    necessary because ThrowingConsumer is public and we cannot add new methods to the interface.
---
 .../flink/util/function/ConsumerWithException.java | 43 ----------------------
 .../apache/flink/util/function/FunctionUtils.java  | 19 ++++++++++
 .../flink/util/function/ThrowingRunnable.java      | 18 +++++++++
 .../ZooKeeperCompletedCheckpointStore.java         |  4 +-
 .../flink/runtime/dispatcher/Dispatcher.java       | 13 ++++---
 5 files changed, 46 insertions(+), 51 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
deleted file mode 100644
index 09507d4..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util.function;
-
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.function.Consumer;
-
-/**
- * A checked extension of the {@link Consumer} interface.
- *
- * @param <T> type of the first argument
- * @param <E> type of the thrown exception
- */
-public interface ConsumerWithException<T, E extends Throwable> extends Consumer<T> {
-
-	void acceptWithException(T value) throws E;
-
-	@Override
-	default void accept(T value) {
-		try {
-			acceptWithException(value);
-		} catch (Throwable t) {
-			ExceptionUtils.rethrow(t);
-		}
-	}
-}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
index c15ece1..678ef9f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.util.function;
 
 import org.apache.flink.util.ExceptionUtils;
 
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -50,4 +51,22 @@ public class FunctionUtils {
 			}
 		};
 	}
+
+	/**
+	 * Converts a {@link ThrowingConsumer} into a {@link Consumer} which throws checked exceptions
+	 * as unchecked.
+	 *
+	 * @param throwingConsumer to convert into a {@link Consumer}
+	 * @param <A> input type
+	 * @return {@link Consumer} which throws all checked exceptions as unchecked
+	 */
+	public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?> throwingConsumer) {
+		return (A value) -> {
+			try {
+				throwingConsumer.accept(value);
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
index 4fef420..0dd4047 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.util.function;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
 
 /**
  * Similar to a {@link Runnable}, this interface is used to capture a block of code
@@ -35,4 +36,21 @@ public interface ThrowingRunnable<E extends Throwable> {
 	 * @throws E Exceptions may be thrown.
 	 */
 	void run() throws E;
+
+	/**
+	 * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions
+	 * as unchecked.
+	 *
+	 * @param throwingRunnable to convert into a {@link Runnable}
+	 * @return {@link Runnable} which throws all checked exceptions as unchecked.
+	 */
+	static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
+		return () -> {
+			try {
+				throwingRunnable.run();
+			} catch (Throwable t) {
+				ExceptionUtils.rethrow(t);
+			}
+		};
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 1317339..e443fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
@@ -246,7 +246,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		LOG.debug("Added {} to {}.", checkpoint, path);
 	}
 
-	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
+	private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
 		try {
 			if (tryRemove(completedCheckpoint.getCheckpointID())) {
 				executor.execute(() -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4986f1d..5279e50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -64,8 +64,9 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiFunctionWithException;
-import org.apache.flink.util.function.ConsumerWithException;
 import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -849,7 +850,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 	}
 
-	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
+	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
 		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
@@ -858,10 +859,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 						throwable)); });
 
 		return jobManagerTerminationFuture.thenRunAsync(
-			() -> {
+			ThrowingRunnable.unchecked(() -> {
 				jobManagerTerminationFutures.remove(jobId);
 				action.accept(jobGraph);
-			},
+			}),
 			getMainThreadExecutor());
 	}
 
@@ -934,11 +935,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 					final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
 						(Optional<JobGraph> jobGraphOptional) -> jobGraphOptional.map(
 							FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
-								(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
+								FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
 										if (!isRecoveredJobRunning) {
 											submittedJobGraphStore.releaseJobGraph(jobId);
 										}
-									},
+									}),
 									getRpcService().getExecutor())))
 							.orElse(CompletableFuture.completedFuture(null)),
 						getUnfencedMainThreadExecutor());