You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:09 UTC
[flink] 06/09: [hotfix] Add ThrowingRunnable#unchecked and
FunctionUtils#uncheckedConsumer
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7be3956d3e0ccd96d0f777f0fcf97080a1d63112
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 10:26:35 2018 +0200
[hotfix] Add ThrowingRunnable#unchecked and FunctionUtils#uncheckedConsumer
ThrowingRunnable#unchecked converts a ThrowingRunnable into a Runnable which throws checked
exceptions as unchecked ones. FunctionUtils#uncheckedConsmer(ThrowingConsumer) converts a
ThrowingConsumer into a Consumer which throws checked exceptions as unchecked ones. This is
necessary because ThrowingConsumer is public and we cannot add new methods to the interface.
---
.../flink/util/function/ConsumerWithException.java | 43 ----------------------
.../apache/flink/util/function/FunctionUtils.java | 19 ++++++++++
.../flink/util/function/ThrowingRunnable.java | 18 +++++++++
.../ZooKeeperCompletedCheckpointStore.java | 4 +-
.../flink/runtime/dispatcher/Dispatcher.java | 13 ++++---
5 files changed, 46 insertions(+), 51 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
deleted file mode 100644
index 09507d4..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util.function;
-
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.function.Consumer;
-
-/**
- * A checked extension of the {@link Consumer} interface.
- *
- * @param <T> type of the first argument
- * @param <E> type of the thrown exception
- */
-public interface ConsumerWithException<T, E extends Throwable> extends Consumer<T> {
-
- void acceptWithException(T value) throws E;
-
- @Override
- default void accept(T value) {
- try {
- acceptWithException(value);
- } catch (Throwable t) {
- ExceptionUtils.rethrow(t);
- }
- }
-}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
index c15ece1..678ef9f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.util.function;
import org.apache.flink.util.ExceptionUtils;
+import java.util.function.Consumer;
import java.util.function.Function;
/**
@@ -50,4 +51,22 @@ public class FunctionUtils {
}
};
}
+
+ /**
+ * Converts a {@link ThrowingConsumer} into a {@link Consumer} which throws checked exceptions
+ * as unchecked.
+ *
+ * @param throwingConsumer to convert into a {@link Consumer}
+ * @param <A> input type
+ * @return {@link Consumer} which throws all checked exceptions as unchecked
+ */
+ public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?> throwingConsumer) {
+ return (A value) -> {
+ try {
+ throwingConsumer.accept(value);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ };
+ }
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
index 4fef420..0dd4047 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
@@ -19,6 +19,7 @@
package org.apache.flink.util.function;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
/**
* Similar to a {@link Runnable}, this interface is used to capture a block of code
@@ -35,4 +36,21 @@ public interface ThrowingRunnable<E extends Throwable> {
* @throws E Exceptions may be thrown.
*/
void run() throws E;
+
+ /**
+ * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions
+ * as unchecked.
+ *
+ * @param throwingRunnable to convert into a {@link Runnable}
+ * @return {@link Runnable} which throws all checked exceptions as unchecked.
+ */
+ static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
+ return () -> {
+ try {
+ throwingRunnable.run();
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ };
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 1317339..e443fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
@@ -246,7 +246,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
LOG.debug("Added {} to {}.", checkpoint, path);
}
- private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
+ private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
try {
if (tryRemove(completedCheckpoint.getCheckpointID())) {
executor.execute(() -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4986f1d..5279e50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -64,8 +64,9 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiFunctionWithException;
-import org.apache.flink.util.function.ConsumerWithException;
import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -849,7 +850,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
}
- private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
+ private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
@@ -858,10 +859,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
throwable)); });
return jobManagerTerminationFuture.thenRunAsync(
- () -> {
+ ThrowingRunnable.unchecked(() -> {
jobManagerTerminationFutures.remove(jobId);
action.accept(jobGraph);
- },
+ }),
getMainThreadExecutor());
}
@@ -934,11 +935,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
(Optional<JobGraph> jobGraphOptional) -> jobGraphOptional.map(
FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
- (ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
+ FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
if (!isRecoveredJobRunning) {
submittedJobGraphStore.releaseJobGraph(jobId);
}
- },
+ }),
getRpcService().getExecutor())))
.orElse(CompletableFuture.completedFuture(null)),
getUnfencedMainThreadExecutor());