You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/02 09:05:17 UTC
flink git commit: [FLINK-7323] [futures] Replace Flink's futures with
Java 8's CompletableFuture in MasterHooks
Repository: flink
Updated Branches:
refs/heads/master bfd7251a2 -> 9de270cb4
[FLINK-7323] [futures] Replace Flink's futures with Java 8's CompletableFuture in MasterHooks
This closes #4437.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9de270cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9de270cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9de270cb
Branch: refs/heads/master
Commit: 9de270cb4dc89ddea3dde11e3215b0f68f47b420
Parents: bfd7251
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 19:11:31 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Aug 2 11:04:50 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/checkpoint/MasterTriggerRestoreHook.java | 5 +++--
.../apache/flink/runtime/checkpoint/hooks/MasterHooks.java | 6 +++---
.../checkpoint/CheckpointCoordinatorMasterHooksTest.java | 6 +++---
.../flink/runtime/checkpoint/hooks/MasterHooksTest.java | 4 ++--
.../streaming/graph/WithMasterCheckpointHookConfigTest.java | 7 ++++---
5 files changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
index e77ed57..026046f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
@@ -19,9 +19,10 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.runtime.concurrent.Future;
import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
@@ -90,7 +91,7 @@ public interface MasterTriggerRestoreHook<T> {
* @throws Exception Exceptions encountered when calling the hook will cause the checkpoint to abort.
*/
@Nullable
- Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception;
+ CompletableFuture<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception;
/**
* This method is called by the checkpoint coordinator prior to restoring the state of a checkpoint.
http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 1851eb6..92504bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -36,6 +35,7 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
@@ -98,7 +98,7 @@ public class MasterHooks {
final SimpleVersionedSerializer<T> serializer = typedHook.createCheckpointDataSerializer();
// call the hook!
- final Future<T> resultFuture;
+ final CompletableFuture<T> resultFuture;
try {
resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
}
@@ -307,7 +307,7 @@ public class MasterHooks {
@Nullable
@Override
- public Future<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
+ public CompletableFuture<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
Executor wrappedExecutor = new Executor() {
@Override
public void execute(Runnable command) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index d6daa4e..5df5c58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -44,6 +43,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
@@ -140,13 +140,13 @@ public class CheckpointCoordinatorMasterHooksTest {
when(statefulHook1.getIdentifier()).thenReturn(id1);
when(statefulHook1.createCheckpointDataSerializer()).thenReturn(new StringSerializer());
when(statefulHook1.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
- .thenReturn(FlinkCompletableFuture.completed(state1));
+ .thenReturn(CompletableFuture.completedFuture(state1));
final MasterTriggerRestoreHook<Long> statefulHook2 = mockGeneric(MasterTriggerRestoreHook.class);
when(statefulHook2.getIdentifier()).thenReturn(id2);
when(statefulHook2.createCheckpointDataSerializer()).thenReturn(new LongSerializer());
when(statefulHook2.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
- .thenReturn(FlinkCompletableFuture.completed(state2));
+ .thenReturn(CompletableFuture.completedFuture(state2));
final MasterTriggerRestoreHook<Void> statelessHook = mockGeneric(MasterTriggerRestoreHook.class);
when(statelessHook.getIdentifier()).thenReturn("some-id");
http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
index f4270dd..3498a41 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -20,13 +20,13 @@ package org.apache.flink.runtime.checkpoint.hooks;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import javax.annotation.Nullable;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
@@ -69,7 +69,7 @@ public class MasterHooksTest extends TestLogger {
@Nullable
@Override
- public Future<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+ public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
executor.execute(command);
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
index 5d606ee..2585ef5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook.Factory;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -36,6 +36,7 @@ import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import static java.util.Arrays.asList;
@@ -48,7 +49,7 @@ import static org.junit.Assert.assertTrue;
* configured in the job's checkpoint settings.
*/
@SuppressWarnings("serial")
-public class WithMasterCheckpointHookConfigTest {
+public class WithMasterCheckpointHookConfigTest extends TestLogger {
/**
* This test creates a program with 4 sources (2 with master hooks, 2 without).
@@ -115,7 +116,7 @@ public class WithMasterCheckpointHookConfigTest {
}
@Override
- public Future<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
+ public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
throw new UnsupportedOperationException();
}