You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/02 09:05:17 UTC

flink git commit: [FLINK-7323] [futures] Replace Flink's futures with Java 8's CompletableFuture in MasterHooks

Repository: flink
Updated Branches:
  refs/heads/master bfd7251a2 -> 9de270cb4


[FLINK-7323] [futures] Replace Flink's futures with Java 8's CompletableFuture in MasterHooks

This closes #4437.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9de270cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9de270cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9de270cb

Branch: refs/heads/master
Commit: 9de270cb4dc89ddea3dde11e3215b0f68f47b420
Parents: bfd7251
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 19:11:31 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Aug 2 11:04:50 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/MasterTriggerRestoreHook.java    | 5 +++--
 .../apache/flink/runtime/checkpoint/hooks/MasterHooks.java    | 6 +++---
 .../checkpoint/CheckpointCoordinatorMasterHooksTest.java      | 6 +++---
 .../flink/runtime/checkpoint/hooks/MasterHooksTest.java       | 4 ++--
 .../streaming/graph/WithMasterCheckpointHookConfigTest.java   | 7 ++++---
 5 files changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
index e77ed57..026046f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
@@ -19,9 +19,10 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.runtime.concurrent.Future;
 
 import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
@@ -90,7 +91,7 @@ public interface MasterTriggerRestoreHook<T> {
 	 * @throws Exception Exceptions encountered when calling the hook will cause the checkpoint to abort.
 	 */
 	@Nullable
-	Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception;
+	CompletableFuture<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception;
 
 	/**
 	 * This method is called by the checkpoint coordinator prior to restoring the state of a checkpoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 1851eb6..92504bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -36,6 +35,7 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
@@ -98,7 +98,7 @@ public class MasterHooks {
 		final SimpleVersionedSerializer<T> serializer = typedHook.createCheckpointDataSerializer();
 
 		// call the hook!
-		final Future<T> resultFuture;
+		final CompletableFuture<T> resultFuture;
 		try {
 			resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
 		}
@@ -307,7 +307,7 @@ public class MasterHooks {
 
 		@Nullable
 		@Override
-		public Future<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
+		public CompletableFuture<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
 			Executor wrappedExecutor = new Executor() {
 				@Override
 				public void execute(Runnable command) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index d6daa4e..5df5c58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -44,6 +43,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
@@ -140,13 +140,13 @@ public class CheckpointCoordinatorMasterHooksTest {
 		when(statefulHook1.getIdentifier()).thenReturn(id1);
 		when(statefulHook1.createCheckpointDataSerializer()).thenReturn(new StringSerializer());
 		when(statefulHook1.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
-				.thenReturn(FlinkCompletableFuture.completed(state1));
+				.thenReturn(CompletableFuture.completedFuture(state1));
 
 		final MasterTriggerRestoreHook<Long> statefulHook2 = mockGeneric(MasterTriggerRestoreHook.class);
 		when(statefulHook2.getIdentifier()).thenReturn(id2);
 		when(statefulHook2.createCheckpointDataSerializer()).thenReturn(new LongSerializer());
 		when(statefulHook2.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
-				.thenReturn(FlinkCompletableFuture.completed(state2));
+				.thenReturn(CompletableFuture.completedFuture(state2));
 
 		final MasterTriggerRestoreHook<Void> statelessHook = mockGeneric(MasterTriggerRestoreHook.class);
 		when(statelessHook.getIdentifier()).thenReturn("some-id");

http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
index f4270dd..3498a41 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -20,13 +20,13 @@ package org.apache.flink.runtime.checkpoint.hooks;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
@@ -69,7 +69,7 @@ public class MasterHooksTest extends TestLogger {
 
 			@Nullable
 			@Override
-			public Future<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+			public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
 				assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
 				executor.execute(command);
 				return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/9de270cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
index 5d606ee..2585ef5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook.Factory;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -36,6 +36,7 @@ import javax.annotation.Nullable;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static java.util.Arrays.asList;
@@ -48,7 +49,7 @@ import static org.junit.Assert.assertTrue;
  * configured in the job's checkpoint settings.
  */
 @SuppressWarnings("serial")
-public class WithMasterCheckpointHookConfigTest {
+public class WithMasterCheckpointHookConfigTest extends TestLogger {
 
 	/**
 	 * This test creates a program with 4 sources (2 with master hooks, 2 without).
@@ -115,7 +116,7 @@ public class WithMasterCheckpointHookConfigTest {
 		}
 
 		@Override
-		public Future<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
+		public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
 			throw new UnsupportedOperationException();
 		}