You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/27 16:40:31 UTC

[1/3] flink git commit: [FLINK-4690] Use direct executor to run slot allocation future handler

Repository: flink
Updated Branches:
  refs/heads/master 6e123d287 -> 84672c22f


[FLINK-4690] Use direct executor to run slot allocation future handler


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84672c22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84672c22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84672c22

Branch: refs/heads/master
Commit: 84672c22f8088a70caf35b54d74eee458bf600dd
Parents: 7b88f1a
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Sep 27 15:33:07 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 18:39:36 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/Executors.java     | 52 +++++++++++++++++
 .../flink/runtime/executiongraph/Execution.java | 61 ++++++++------------
 .../runtime/jobmanager/scheduler/Scheduler.java | 15 +++--
 3 files changed, 84 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
new file mode 100644
index 0000000..1832d70
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Collection of {@link Executor} implementations
+ */
+public class Executors {
+
+	/**
+	 * Return a direct executor. The direct executor directly executes the runnable in the calling
+	 * thread.
+	 *
+	 * @return Direct executor
+	 */
+	public static Executor directExecutor() {
+		return DirectExecutor.INSTANCE;
+	}
+
+	/**
+	 * Direct executor implementation.
+	 */
+	private static class DirectExecutor implements Executor {
+
+		static final DirectExecutor INSTANCE = new DirectExecutor();
+
+		private DirectExecutor() {}
+
+		@Override
+		public void execute(Runnable command) {
+			command.run();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 8c02e1b..912ff10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -52,7 +53,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -297,49 +297,38 @@ public class Execution {
 
 			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
 			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
-			final Future<SimpleSlot> future = slotProvider.allocateSlot(toSchedule, queued);
+			final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued);
 
-			if (queued) {
-				future.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() {
-					@Override
-					public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-						if (simpleSlot != null) {
+			// IMPORTANT: We have to use the direct executor here so that we directly deploy the tasks
+			// if the slot allocation future is completed. This is necessary for immediate deployment
+			final Future<Void> deploymentFuture = slotAllocationFuture.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() {
+				@Override
+				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+					if (simpleSlot != null) {
+						try {
+							deployToSlot(simpleSlot);
+						} catch (Throwable t) {
 							try {
-								deployToSlot(simpleSlot);
-							} catch (Throwable t) {
-								try {
-									simpleSlot.releaseSlot();
-								} finally {
-									markFailed(t);
-								}
+								simpleSlot.releaseSlot();
+							} finally {
+								markFailed(t);
 							}
 						}
-						else {
-							markFailed(throwable);
-						}
-						return null;
 					}
-				}, ExecutionContext$.MODULE$.global());
-			}
-			else {
-				SimpleSlot slot = null;
-				try {
-					// when queued is not allowed, we will get a slot or NoResourceAvailableException will be
-					// thrown earlier (when allocateSlot).
-					slot = checkNotNull(future.getNow(null));
-					deployToSlot(slot);
-				}
-				catch (Throwable t) {
-					try {
-						if (slot != null) {
-							slot.releaseSlot();
-						}
-					} finally {
-						markFailed(t);
+					else {
+						markFailed(throwable);
 					}
+					return null;
 				}
-			}
+			}, Executors.directExecutor());
 
+			// if tasks have to scheduled immediately check that the task has been deployed
+			if (!queued) {
+				if (!deploymentFuture.isDone()) {
+					markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet."));
+				}
+			}
+			
 			return true;
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index ce2f6f7..b839e0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -140,9 +141,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 		final Object ret = scheduleTask(task, allowQueued);
 		if (ret instanceof SimpleSlot) {
-			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
-			future.complete((SimpleSlot) ret);
-			return future;
+			return FlinkCompletableFuture.completed((SimpleSlot) ret);
 		}
 		else if (ret instanceof Future) {
 			return (Future) ret;
@@ -153,7 +152,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 	}
 
 	/**
-	 * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link Future}.
+	 * Returns either a {@link SimpleSlot}, or a {@link Future}.
 	 */
 	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
 		if (task == null) {
@@ -316,7 +315,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				else {
 					// no resource available now, so queue the request
 					if (queueIfNoResource) {
-						FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+						CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
 						this.taskQueue.add(new QueuedTask(task, future));
 						return future;
 					}
@@ -833,10 +832,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 		
 		private final ScheduledUnit task;
 		
-		private final FlinkCompletableFuture<SimpleSlot> future;
+		private final CompletableFuture<SimpleSlot> future;
 		
 		
-		public QueuedTask(ScheduledUnit task, FlinkCompletableFuture<SimpleSlot> future) {
+		public QueuedTask(ScheduledUnit task, CompletableFuture<SimpleSlot> future) {
 			this.task = task;
 			this.future = future;
 		}
@@ -845,7 +844,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			return task;
 		}
 
-		public FlinkCompletableFuture<SimpleSlot> getFuture() {
+		public CompletableFuture<SimpleSlot> getFuture() {
 			return future;
 		}
 	}


[3/3] flink git commit: [FLINK-4361] Introduce Flink's own future abstraction

Posted by tr...@apache.org.
[FLINK-4361] Introduce Flink's own future abstraction

Flink's future abstraction whose API is similar to Java 8's CompletableFuture.
That's in order to ease a future transition to this class once we ditch Java 7.
The current set of operations comprises:

- isDone to check the completion of the future
- get/getNow to obtain the future's value
- cancel to cancel the future (best effort basis)
- thenApplyAsync to transform the future's value into another value
- thenAcceptAsync to register a callback for a successful completion of the future
- exceptionallyAsync to register a callback for an exception completion of the future
- thenComposeAsync to transform the future's value and flatten the returned future
- handleAsync to register a callback which is called either with the regular result
or the exceptional result

Additionally, Flink offers a CompletableFuture which can be completed with a regular
value or an exception:

- complete/completeExceptionally

Complete FlinkCompletableFuture exceptionally with a CanellationException upon cancel

Add convenience functions for FlinkCompletableFutures

This closes #2554.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8138f4b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8138f4b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8138f4b

Branch: refs/heads/master
Commit: f8138f4b74332ecb4ef0d28a09e8549708118ca6
Parents: 6e123d2
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Sep 2 21:13:34 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 18:39:36 2016 +0200

----------------------------------------------------------------------
 .../runtime/concurrent/AcceptFunction.java      |  34 +++
 .../flink/runtime/concurrent/ApplyFunction.java |  36 +++
 .../flink/runtime/concurrent/BiFunction.java    |  38 +++
 .../runtime/concurrent/CompletableFuture.java   |  47 ++++
 .../apache/flink/runtime/concurrent/Future.java | 156 +++++++++++
 .../concurrent/impl/FlinkCompletableFuture.java |  91 +++++++
 .../runtime/concurrent/impl/FlinkFuture.java    | 273 +++++++++++++++++++
 .../runtime/concurrent/FlinkFutureTest.java     | 269 ++++++++++++++++++
 8 files changed, 944 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
new file mode 100644
index 0000000..a300647
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Function which is called with a single argument and does not return a value.
+ *
+ * @param <T> type of the argument
+ */
+public interface AcceptFunction<T> {
+
+	/**
+	 * Method which handles the function call.
+	 *
+	 * @param value is the function's argument
+	 */
+	void accept(T value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
new file mode 100644
index 0000000..64def98
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Function which is called with a single argument.
+ *
+ * @param <V> type of the argument
+ * @param <R> type of the return value
+ */
+public interface ApplyFunction<V, R> {
+
+	/**
+	 * Method which handles the function call.
+	 *
+	 * @param value is the single argument
+	 * @return the function value
+	 */
+	R apply(V value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
new file mode 100644
index 0000000..2b09de8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Function which is called with two arguments and returns a value.
+ *
+ * @param <T> type of the first argument
+ * @param <U> type of the second argument
+ * @param <R> type of the return value
+ */
+public interface BiFunction<T, U, R> {
+
+	/**
+	 * Method which handles the function call.
+	 *
+	 * @param t first argument
+	 * @param u second argument
+	 * @return the function value
+	 */
+	R apply(T t, U u);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
new file mode 100644
index 0000000..5288bf2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Flink's completable future abstraction. A completable future can be completed with a regular
+ * value or an exception.
+ *
+ * @param <T> type of the future's value
+ */
+public interface CompletableFuture<T> extends Future<T> {
+
+	/**
+	 * Completes the future with the given value. The complete operation only succeeds if the future
+	 * has not been completed before. Whether it is successful or not is returned by the method.
+	 *
+	 * @param value to complete the future with
+	 * @return true if the completion was successful; otherwise false
+	 */
+	boolean complete(T value);
+
+	/**
+	 * Completes the future with the given exception. The complete operation only succeeds if the
+	 * future has not been completed before. Whether it is successful or not is returned by the
+	 * method.
+	 *
+	 * @param t the exception to complete the future with
+	 * @return true if the completion was successful; otherwise false
+	 */
+	boolean completeExceptionally(Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
new file mode 100644
index 0000000..b32bcd4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Flink's basic future abstraction. A future represents an asynchronous operation whose result
+ * will be contained in this instance upon completion.
+ *
+ * @param <T> type of the future's result
+ */
+public interface Future<T> {
+
+	/**
+	 * Checks if the future has been completed. A future is completed, if the result has been
+	 * delivered.
+	 *
+	 * @return true if the future is completed; otherwise false
+	 */
+	boolean isDone();
+
+	/**
+	 * Tries to cancel the future's operation. Note that not all future operations can be canceled.
+	 * The result of the cancelling will be returned.
+	 *
+	 * @param mayInterruptIfRunning true iff the future operation may be interrupted
+	 * @return true if the cancelling was successful; otherwise false
+	 */
+	boolean cancel(boolean mayInterruptIfRunning);
+
+	/**
+	 * Gets the result value of the future. If the future has not been completed, then this
+	 * operation will block indefinitely until the result has been delivered.
+	 *
+	 * @return the result value
+	 * @throws CancellationException if the future has been cancelled
+	 * @throws InterruptedException if the current thread was interrupted while waiting for the result
+	 * @throws ExecutionException if the future has been completed with an exception
+	 */
+	T get() throws InterruptedException, ExecutionException;
+
+	/**
+	 * Gets the result value of the future. If the future has not been done, then this operation
+	 * will block the given timeout value. If the result has not been delivered within the timeout,
+	 * then the method throws an {@link TimeoutException}.
+	 *
+	 * @param timeout the time to wait for the future to be done
+	 * @param unit time unit for the timeout argument
+	 * @return the result value
+	 * @throws CancellationException if the future has been cancelled
+	 * @throws InterruptedException if the current thread was interrupted while waiting for the result
+	 * @throws ExecutionException if the future has been completed with an exception
+	 * @throws TimeoutException if the future has not been completed within the given timeout
+	 */
+	T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
+
+	/**
+	 * Gets the value of the future. If the future has not been completed when calling this
+	 * function, the given value is returned.
+	 *
+	 * @param valueIfAbsent value which is returned if the future has not been completed
+	 * @return value of the future or the given value if the future has not been completed
+	 * @throws ExecutionException if the future has been completed with an exception
+	 */
+	T getNow(T valueIfAbsent) throws ExecutionException;
+
+	/**
+	 * Applies the given function to the value of the future. The result of the apply function is
+	 * the value of the newly returned future.
+	 * <p>
+	 * The apply function is executed asynchronously by the given executor.
+	 *
+	 * @param applyFunction function to apply to the future's value
+	 * @param executor used to execute the given apply function asynchronously
+	 * @param <R> type of the apply function's return value
+	 * @return future representing the return value of the given apply function
+	 */
+	<R> Future<R> thenApplyAsync(ApplyFunction<? super T, ? extends R> applyFunction, Executor executor);
+
+	/**
+	 * Applies the accept function to the value of the future. Unlike the {@link ApplyFunction}, the
+	 * {@link AcceptFunction} does not return a value. The returned future, thus, represents only
+	 * the completion of the accept callback.
+	 * <p>
+	 * The accept function is executed asynchronously by the given executor.
+	 *
+	 * @param acceptFunction function to apply to the future's value
+	 * @param executor used to execute the given apply function asynchronously
+	 * @return future representing the completion of the accept callback
+	 */
+	Future<Void> thenAcceptAsync(AcceptFunction<? super T> acceptFunction, Executor executor);
+
+	/**
+	 * Applies the given function to the value of the future if the future has been completed
+	 * exceptionally. The completing exception is given to the apply function which can return a new
+	 * value which is the value of the returned future.
+	 * <p>
+	 * The apply function is executed asynchronously by the given executor.
+	 *
+	 * @param exceptionallyFunction to apply to the future's value if it is an exception
+	 * @param executor used to execute the given apply function asynchronously
+	 * @param <R> type of the apply function's return value
+	 * @return future representing the return value of the given apply function
+	 */
+	<R> Future<R> exceptionallyAsync(ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor);
+
+	/**
+	 * Applies the given function to the value of the future. The apply function returns a future
+	 * result, which is flattened. This means that the resulting future of this method represents
+	 * the future's value of the apply function.
+	 * <p>
+	 * The apply function is executed asynchronously by the given executor.
+	 *
+	 * @param composeFunction to apply to the future's value. The function returns a future which is
+	 *                        flattened
+	 * @param executor used to execute the given apply function asynchronously
+	 * @param <R> type of the returned future's value
+	 * @return future representing the flattened return value of the apply function
+	 */
+	<R> Future<R> thenComposeAsync(ApplyFunction<? super T, Future<? extends R>> composeFunction, Executor executor);
+
+	/**
+	 * Applies the given handle function to the result of the future. The result can either be the
+	 * future's value or the exception with which the future has been completed. The two cases are
+	 * mutually exclusive. The result of the handle function is the returned future's value.
+	 * <p>
+	 * The handle function is executed asynchronously by the given executor.
+	 *
+	 * @param biFunction applied to the result (normal and exceptional) of the future
+	 * @param executor used to execute the handle function asynchronously
+	 * @param <R> type of the handle function's return value
+	 * @return future representing the handle function's return value
+	 */
+	<R> Future<R> handleAsync(BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
new file mode 100644
index 0000000..e648a71
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.impl;
+
+import akka.dispatch.Futures;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Promise;
+import scala.concurrent.Promise$;
+
+import java.util.concurrent.CancellationException;
+
+/**
+ * Implementation of {@link CompletableFuture} which is backed by {@link Promise}.
+ *
+ * @param <T> type of the future's value
+ */
+public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements CompletableFuture<T> {
+
+	private final Promise<T> promise;
+
+	public FlinkCompletableFuture() {
+		promise = Futures.promise();
+		scalaFuture = promise.future();
+	}
+
+	private FlinkCompletableFuture(T value) {
+		promise = Promise$.MODULE$.successful(value);
+		scalaFuture = promise.future();
+	}
+
+	private FlinkCompletableFuture(Throwable t) {
+		promise = Promise$.MODULE$.failed(t);
+		scalaFuture = promise.future();
+	}
+
+	@Override
+	public boolean complete(T value) {
+		Preconditions.checkNotNull(value);
+
+		try {
+			promise.success(value);
+
+			return true;
+		} catch (IllegalStateException e) {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean completeExceptionally(Throwable t) {
+		Preconditions.checkNotNull(t);
+
+		try {
+			promise.failure(t);
+
+			return true;
+		} catch (IllegalStateException e) {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean cancel(boolean mayInterruptIfRunning) {
+		return completeExceptionally(new CancellationException("Future has been canceled."));
+	}
+
+	public static <T> FlinkCompletableFuture<T> completed(T value) {
+		return new FlinkCompletableFuture<>(value);
+	}
+
+	public static <T> FlinkCompletableFuture<T> completedExceptionally(Throwable t) {
+		return new FlinkCompletableFuture<>(t);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
new file mode 100644
index 0000000..361cd3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.impl;
+
+import akka.dispatch.ExecutionContexts$;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.Recover;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.util.Preconditions;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+import scala.util.Failure;
+import scala.util.Success;
+import scala.util.Try;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implementation of {@link Future} which is backed by {@link scala.concurrent.Future}.
+ *
+ * @param <T> type of the future's value
+ */
+public class FlinkFuture<T> implements Future<T> {
+
+	protected scala.concurrent.Future<T> scalaFuture;
+
+	FlinkFuture() {
+		scalaFuture = null;
+	}
+
+	public FlinkFuture(scala.concurrent.Future<T> scalaFuture) {
+		this.scalaFuture = Preconditions.checkNotNull(scalaFuture);
+	}
+
+	//-----------------------------------------------------------------------------------
+	// Future's methods
+	//-----------------------------------------------------------------------------------
+
+	@Override
+	public boolean isDone() {
+		return scalaFuture.isCompleted();
+	}
+
+	@Override
+	public boolean cancel(boolean mayInterruptIfRunning) {
+		return false;
+	}
+
+	@Override
+	public T get() throws InterruptedException, ExecutionException {
+		Preconditions.checkNotNull(scalaFuture);
+
+		try {
+			return Await.result(scalaFuture, Duration.Inf());
+		} catch (InterruptedException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExecutionException(e);
+		}
+	}
+
+	@Override
+	public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+		Preconditions.checkNotNull(scalaFuture);
+		Preconditions.checkArgument(timeout >= 0L, "The timeout value has to be larger or " +
+			"equal than 0.");
+
+		try {
+			return Await.result(scalaFuture, new FiniteDuration(timeout, unit));
+		} catch (InterruptedException | TimeoutException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExecutionException(e);
+		}
+	}
+
+	@Override
+	public T getNow(T valueIfAbsent) throws ExecutionException {
+		Preconditions.checkNotNull(scalaFuture);
+		Preconditions.checkNotNull(valueIfAbsent);
+
+		Option<Try<T>> value = scalaFuture.value();
+
+		if (value.isDefined()) {
+			Try<T> tri = value.get();
+
+			if (tri instanceof Success) {
+				return ((Success<T>)tri).value();
+			} else {
+				throw new ExecutionException(((Failure<T>)tri).exception());
+			}
+		} else {
+			return valueIfAbsent;
+		}
+	}
+
+	@Override
+	public <R> Future<R> thenApplyAsync(final ApplyFunction<? super T, ? extends R> applyFunction, Executor executor) {
+		Preconditions.checkNotNull(scalaFuture);
+		Preconditions.checkNotNull(applyFunction);
+		Preconditions.checkNotNull(executor);
+
+		scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new Mapper<T, R>() {
+			@Override
+			public R apply(T value) {
+				return applyFunction.apply(value);
+			}
+		}, createExecutionContext(executor));
+
+		return new FlinkFuture<>(mappedFuture);
+	}
+
+	@Override
+	public Future<Void> thenAcceptAsync(final AcceptFunction<? super T> acceptFunction, Executor executor) {
+		Preconditions.checkNotNull(scalaFuture);
+		Preconditions.checkNotNull(acceptFunction);
+		Preconditions.checkNotNull(executor);
+
+		scala.concurrent.Future<Void> acceptedFuture = scalaFuture.map(new Mapper<T, Void>() {
+			@Override
+			public Void apply(T value) {
+				acceptFunction.accept(value);
+
+				return null;
+			}
+		}, createExecutionContext(executor));
+
+		return new FlinkFuture<>(acceptedFuture);
+	}
+
+	@Override
+	public <R> Future<R> exceptionallyAsync(final ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor) {
+		Preconditions.checkNotNull(scalaFuture);
+		Preconditions.checkNotNull(exceptionallyFunction);
+		Preconditions.checkNotNull(executor);
+
+		scala.concurrent.Future<R> recoveredFuture = scalaFuture.recover(new Recover<R>() {
+			@Override
+			public R recover(Throwable failure) throws Throwable {
+				return exceptionallyFunction.apply(failure);
+			}
+		}, createExecutionContext(executor));
+
+		return new FlinkFuture<>(recoveredFuture);
+	}
+
+	@Override
+	public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, Future<? extends R>> applyFunction, final Executor executor) {
+		Preconditions.checkNotNull(scalaFuture);
+		Preconditions.checkNotNull(applyFunction);
+		Preconditions.checkNotNull(executor);
+
+		scala.concurrent.Future<R> flatMappedFuture = scalaFuture.flatMap(new Mapper<T, scala.concurrent.Future<R>>() {
+			@Override
+			public scala.concurrent.Future<R> apply(T value) {
+				final Future<? extends R> future = applyFunction.apply(value);
+
+				if (future instanceof FlinkFuture) {
+					@SuppressWarnings("unchecked")
+					FlinkFuture<R> flinkFuture = (FlinkFuture<R>) future;
+
+					return flinkFuture.scalaFuture;
+				} else {
+					return Futures.future(new Callable<R>() {
+						@Override
+						public R call() throws Exception {
+							return future.get();
+						}
+					}, createExecutionContext(executor));
+				}
+			}
+		}, createExecutionContext(executor));
+
+		return new FlinkFuture<>(flatMappedFuture);
+	}
+
+	@Override
+	public <R> Future<R> handleAsync(final BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor) {
+		Preconditions.checkNotNull(scalaFuture);
+		Preconditions.checkNotNull(biFunction);
+		Preconditions.checkNotNull(executor);
+
+		scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new Mapper<T, R>() {
+			@Override
+			public R checkedApply(T value) throws Exception {
+				try {
+					return biFunction.apply(value, null);
+				} catch (Throwable t) {
+					throw new FlinkFuture.WrapperException(t);
+				}
+			}
+		}, createExecutionContext(executor));
+
+		scala.concurrent.Future<R> recoveredFuture = mappedFuture.recover(new Recover<R>() {
+			@Override
+			public R recover(Throwable failure) throws Throwable {
+				if (failure instanceof FlinkFuture.WrapperException) {
+					throw failure.getCause();
+				} else {
+					return biFunction.apply(null, failure);
+				}
+			}
+		}, createExecutionContext(executor));
+
+
+		return new FlinkFuture<>(recoveredFuture);
+	}
+
+	//-----------------------------------------------------------------------------------
+	// Static factory methods
+	//-----------------------------------------------------------------------------------
+
+	/**
+	 * Creates a future whose value is determined by the asynchronously executed callable.
+	 *
+	 * @param callable whose value is delivered by the future
+	 * @param executor to be used to execute the callable
+	 * @param <T> type of the future's value
+	 * @return future which represents the value of the callable
+	 */
+	public static <T> Future<T> supplyAsync(Callable<T> callable, Executor executor) {
+		Preconditions.checkNotNull(callable);
+		Preconditions.checkNotNull(executor);
+
+		scala.concurrent.Future<T> scalaFuture = Futures.future(callable, createExecutionContext(executor));
+
+		return new FlinkFuture<>(scalaFuture);
+	}
+
+	//-----------------------------------------------------------------------------------
+	// Helper functions and types
+	//-----------------------------------------------------------------------------------
+
+	private static ExecutionContext createExecutionContext(Executor executor) {
+		return ExecutionContexts$.MODULE$.fromExecutor(executor);
+	}
+
+	private static class WrapperException extends Exception {
+
+		private static final long serialVersionUID = 6533166370660884091L;
+
+		WrapperException(Throwable cause) {
+			super(cause);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
new file mode 100644
index 0000000..bd5af66
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for Flink's future implementation.
+ */
+public class FlinkFutureTest extends TestLogger {
+
+	private static ExecutorService executor;
+
+	@BeforeClass
+	public static void setup() {
+		executor = Executors.newSingleThreadExecutor();
+	}
+
+	@AfterClass
+	public static void teardown() {
+		executor.shutdown();
+	}
+
+	@Test
+	public void testFutureApply() throws Exception {
+		int expectedValue = 42;
+
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+
+		Future<String> appliedFuture = initialFuture.thenApplyAsync(new ApplyFunction<Integer, String>() {
+			@Override
+			public String apply(Integer value) {
+				return String.valueOf(value);
+			}
+		}, executor);
+
+		initialFuture.complete(expectedValue);
+
+		assertEquals(String.valueOf(expectedValue), appliedFuture.get());
+	}
+
+	@Test(expected = TimeoutException.class)
+	public void testFutureGetTimeout() throws InterruptedException, ExecutionException, TimeoutException {
+		CompletableFuture<Integer> future = new FlinkCompletableFuture<>();
+
+		future.get(10, TimeUnit.MILLISECONDS);
+
+		fail("Get should have thrown a timeout exception.");
+	}
+
+	@Test(expected = TestException.class)
+	public void testExceptionalCompletion() throws Throwable {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+
+		initialFuture.completeExceptionally(new TestException("Test exception"));
+
+		try {
+			initialFuture.get();
+
+			fail("Get should have thrown an exception.");
+		} catch (ExecutionException e) {
+			throw e.getCause();
+		}
+	}
+
+	/**
+	 * Tests that an exception is propagated through an apply function.
+	 */
+	@Test(expected = TestException.class)
+	public void testExceptionPropagation() throws Throwable {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+
+		Future<String> mappedFuture = initialFuture.thenApplyAsync(new ApplyFunction<Integer, String>() {
+			@Override
+			public String apply(Integer value) {
+				throw new TestException("Test exception");
+			}
+		}, executor);
+
+		Future<String> mapped2Future = mappedFuture.thenApplyAsync(new ApplyFunction<String, String>() {
+			@Override
+			public String apply(String value) {
+				return "foobar";
+			}
+		}, executor);
+
+		initialFuture.complete(42);
+
+		try {
+			mapped2Future.get();
+
+			fail("Get should have thrown an exception.");
+		} catch (ExecutionException e) {
+			throw e.getCause();
+		}
+	}
+
+	@Test
+	public void testExceptionally() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+		String exceptionMessage = "Foobar";
+
+		Future<String> recovered = initialFuture.exceptionallyAsync(new ApplyFunction<Throwable, String>() {
+			@Override
+			public String apply(Throwable value) {
+				return value.getMessage();
+			}
+		}, executor);
+
+		initialFuture.completeExceptionally(new TestException(exceptionMessage));
+
+		String actualMessage = recovered.get();
+
+		assertEquals(exceptionMessage, actualMessage);
+	}
+
+	@Test
+	public void testCompose() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+
+		final int expectedValue = 42;
+
+		Future<Integer> composedFuture = initialFuture.thenComposeAsync(new ApplyFunction<Integer, Future<? extends Integer>>() {
+			@Override
+			public Future<? extends Integer> apply(Integer value) {
+				return FlinkFuture.supplyAsync(new Callable<Integer>() {
+					@Override
+					public Integer call() throws Exception {
+						return expectedValue;
+					}
+				}, executor);
+			}
+		}, executor);
+
+		initialFuture.complete(42);
+
+		int actualValue = composedFuture.get();
+
+		assertEquals(expectedValue, actualValue);
+	}
+
+	@Test
+	public void testGetNow() throws ExecutionException {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+
+		final int absentValue = 41;
+
+		assertEquals(new Integer(absentValue), initialFuture.getNow(absentValue));
+	}
+
+	@Test
+	public void testAccept() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+		final AtomicInteger atomicInteger = new AtomicInteger(0);
+		int expectedValue = 42;
+
+		Future<Void> result = initialFuture.thenAcceptAsync(new AcceptFunction<Integer>() {
+			@Override
+			public void accept(Integer value) {
+				atomicInteger.set(value);
+			}
+		}, executor);
+
+		initialFuture.complete(expectedValue);
+
+		result.get();
+
+		assertEquals(expectedValue, atomicInteger.get());
+	}
+
+	@Test
+	public void testHandle() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+		int expectedValue = 43;
+
+		Future<String> result = initialFuture.handleAsync(new BiFunction<Integer, Throwable, String>() {
+			@Override
+			public String apply(Integer integer, Throwable throwable) {
+				if (integer != null) {
+					return String.valueOf(integer);
+				} else {
+					return throwable.getMessage();
+				}
+			}
+		}, executor);
+
+		initialFuture.complete(expectedValue);
+
+		assertEquals(String.valueOf(expectedValue), result.get());
+	}
+
+	@Test
+	public void testHandleException() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+		String exceptionMessage = "foobar";
+
+		Future<String> result = initialFuture.handleAsync(new BiFunction<Integer, Throwable, String>() {
+			@Override
+			public String apply(Integer integer, Throwable throwable) {
+				if (integer != null) {
+					return String.valueOf(integer);
+				} else {
+					return throwable.getMessage();
+				}
+			}
+		}, executor);
+
+		initialFuture.completeExceptionally(new TestException(exceptionMessage));
+
+		assertEquals(exceptionMessage, result.get());
+	}
+
+	@Test
+	public void testMultipleCompleteOperations() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+		int expectedValue = 42;
+
+		assertTrue(initialFuture.complete(expectedValue));
+
+		assertFalse(initialFuture.complete(1337));
+
+		assertFalse(initialFuture.completeExceptionally(new TestException("foobar")));
+
+		assertEquals(new Integer(expectedValue), initialFuture.get());
+	}
+
+	private static class TestException extends RuntimeException {
+
+		private static final long serialVersionUID = -1274022962838535130L;
+
+		public TestException(String message) {
+			super(message);
+		}
+	}
+}


[2/3] flink git commit: [FLINK-4690] Replace SlotAllocationFuture with flink's own future

Posted by tr...@apache.org.
[FLINK-4690] Replace SlotAllocationFuture with flink's own future

This closes #2552.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b88f1a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b88f1a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b88f1a7

Branch: refs/heads/master
Commit: 7b88f1a75ea92f6b26624a7358e7fcafa3e9506f
Parents: f8138f4
Author: Kurt Young <yk...@gmail.com>
Authored: Tue Sep 27 12:10:08 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 18:39:36 2016 +0200

----------------------------------------------------------------------
 .../runtime/concurrent/impl/FlinkFuture.java    |   1 -
 .../flink/runtime/executiongraph/Execution.java |  55 ++---
 .../flink/runtime/instance/SlotProvider.java    |   6 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  24 ++-
 .../scheduler/SlotAllocationFuture.java         | 146 --------------
 .../scheduler/SlotAllocationFutureAction.java   |  34 ----
 .../ExecutionGraphMetricsTest.java              |   9 +-
 .../ExecutionVertexSchedulingTest.java          |  19 +-
 .../scheduler/SchedulerIsolatedTasksTest.java   |  31 ++-
 .../scheduler/SlotAllocationFutureTest.java     | 200 -------------------
 10 files changed, 80 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 361cd3d..3f2c5e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -104,7 +104,6 @@ public class FlinkFuture<T> implements Future<T> {
 	@Override
 	public T getNow(T valueIfAbsent) throws ExecutionException {
 		Preconditions.checkNotNull(scalaFuture);
-		Preconditions.checkNotNull(valueIfAbsent);
 
 		Option<Try<T>> value = scalaFuture.value();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6826365..8c02e1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,18 +20,18 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
-
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -41,20 +41,18 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.slf4j.Logger;
 
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
+import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -299,32 +297,43 @@ public class Execution {
 
 			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
 			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
-			final SlotAllocationFuture future = slotProvider.allocateSlot(toSchedule, queued);
+			final Future<SimpleSlot> future = slotProvider.allocateSlot(toSchedule, queued);
+
 			if (queued) {
-				future.setFutureAction(new SlotAllocationFutureAction() {
+				future.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() {
 					@Override
-					public void slotAllocated(SimpleSlot slot) {
-						try {
-							deployToSlot(slot);
-						}
-						catch (Throwable t) {
+					public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+						if (simpleSlot != null) {
 							try {
-								slot.releaseSlot();
-							} finally {
-								markFailed(t);
+								deployToSlot(simpleSlot);
+							} catch (Throwable t) {
+								try {
+									simpleSlot.releaseSlot();
+								} finally {
+									markFailed(t);
+								}
 							}
 						}
+						else {
+							markFailed(throwable);
+						}
+						return null;
 					}
-				});
+				}, ExecutionContext$.MODULE$.global());
 			}
 			else {
-				SimpleSlot slot = future.get();
+				SimpleSlot slot = null;
 				try {
+					// when queued is not allowed, we will get a slot or NoResourceAvailableException will be
+					// thrown earlier (when allocateSlot).
+					slot = checkNotNull(future.getNow(null));
 					deployToSlot(slot);
 				}
 				catch (Throwable t) {
 					try {
-						slot.releaseSlot();
+						if (slot != null) {
+							slot.releaseSlot();
+						}
 					} finally {
 						markFailed(t);
 					}
@@ -394,7 +403,7 @@ public class Execution {
 			
 			final ActorGateway gateway = slot.getTaskManagerActorGateway();
 
-			final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
+			final scala.concurrent.Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
 
 			deployAction.onComplete(new OnComplete<Object>(){
 
@@ -436,7 +445,7 @@ public class Execution {
 		if (slot != null) {
 			final ActorGateway gateway = slot.getTaskManagerActorGateway();
 
-			Future<Object> stopResult = gateway.retry(
+			scala.concurrent.Future<Object> stopResult = gateway.retry(
 				new StopTask(attemptId),
 				NUM_STOP_CALL_TRIES,
 				timeout,
@@ -916,7 +925,7 @@ public class Execution {
 
 			final ActorGateway gateway = slot.getTaskManagerActorGateway();
 
-			Future<Object> cancelResult = gateway.retry(
+			scala.concurrent.Future<Object> cancelResult = gateway.retry(
 				new CancelTask(attemptId),
 				NUM_CANCEL_CALL_TRIES,
 				timeout,
@@ -965,7 +974,7 @@ public class Execution {
 			final ActorGateway gateway = consumerSlot.getTaskManagerActorGateway();
 			final TaskManagerLocation taskManagerLocation = consumerSlot.getTaskManagerLocation();
 
-			Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
+			scala.concurrent.Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
 
 			futureUpdate.onFailure(new OnFailure() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index b2c23a5..49e6d9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 
 /**
  * The slot provider is responsible for preparing slots for ready-to-run tasks.
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
  * <p>It supports two allocating modes:
  * <ul>
  *     <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
- *         {@link SlotAllocationFuture#get()} to get the allocated slot.</li>
+ *         {@link Future#getNow(Object)} to get the allocated slot.</li>
  *     <li>Queued allocating: A request for a task slot is queued and returns a future that will be
  *         fulfilled as soon as a slot becomes available.</li>
  * </ul>
@@ -44,5 +44,5 @@ public interface SlotProvider {
 	 * 
 	 * @throws NoResourceAvailableException
 	 */
-	SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException;
+	Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index c9cdd00..ce2f6f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,8 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -133,15 +135,17 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 
 	@Override
-	public SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) 
+	public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)
 			throws NoResourceAvailableException {
 
 		final Object ret = scheduleTask(task, allowQueued);
 		if (ret instanceof SimpleSlot) {
-			return new SlotAllocationFuture((SimpleSlot) ret);
+			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+			future.complete((SimpleSlot) ret);
+			return future;
 		}
-		else if (ret instanceof SlotAllocationFuture) {
-			return (SlotAllocationFuture) ret;
+		else if (ret instanceof Future) {
+			return (Future) ret;
 		}
 		else {
 			throw new RuntimeException();
@@ -149,7 +153,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 	}
 
 	/**
-	 * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link SlotAllocationFuture}.
+	 * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link Future}.
 	 */
 	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
 		if (task == null) {
@@ -312,7 +316,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				else {
 					// no resource available now, so queue the request
 					if (queueIfNoResource) {
-						SlotAllocationFuture future = new SlotAllocationFuture();
+						FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
 						this.taskQueue.add(new QueuedTask(task, future));
 						return future;
 					}
@@ -560,7 +564,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						taskQueue.poll();
 						if (queued.getFuture() != null) {
 							try {
-								queued.getFuture().setSlot(newSlot);
+								queued.getFuture().complete(newSlot);
 							}
 							catch (Throwable t) {
 								LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
@@ -829,10 +833,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 		
 		private final ScheduledUnit task;
 		
-		private final SlotAllocationFuture future;
+		private final FlinkCompletableFuture<SimpleSlot> future;
 		
 		
-		public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
+		public QueuedTask(ScheduledUnit task, FlinkCompletableFuture<SimpleSlot> future) {
 			this.task = task;
 			this.future = future;
 		}
@@ -841,7 +845,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			return task;
 		}
 
-		public SlotAllocationFuture getFuture() {
+		public FlinkCompletableFuture<SimpleSlot> getFuture() {
 			return future;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
deleted file mode 100644
index 36e4072..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * 
- */
-public class SlotAllocationFuture {
-
-	private final Object monitor = new Object();
-
-	private volatile SimpleSlot slot;
-
-	private volatile SlotAllocationFutureAction action;
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates a future that is uncompleted.
-	 */
-	public SlotAllocationFuture() {}
-
-	/**
-	 * Creates a future that is immediately completed.
-	 * 
-	 * @param slot The task slot that completes the future.
-	 */
-	public SlotAllocationFuture(SimpleSlot slot) {
-		this.slot = slot;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public SimpleSlot waitTillCompleted() throws InterruptedException {
-		synchronized (monitor) {
-			while (slot == null) {
-				monitor.wait();
-			}
-			return slot;
-		}
-	}
-
-	public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
-		checkArgument(timeout >= 0, "timeout may not be negative");
-		checkNotNull(timeUnit, "timeUnit");
-
-		if (timeout == 0) {
-			return waitTillCompleted();
-		} else {
-			final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
-			long millisToWait;
-
-			synchronized (monitor) {
-				while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) {
-					monitor.wait(millisToWait);
-				}
-
-				if (slot != null) {
-					return slot;
-				} else {
-					throw new TimeoutException();
-				}
-			}
-		}
-	}
-
-	/**
-	 * Gets the slot from this future. This method throws an exception, if the future has not been completed.
-	 * This method never blocks.
-	 * 
-	 * @return The slot with which this future was completed.
-	 * @throws IllegalStateException Thrown, if this method is called before the future is completed.
-	 */
-	public SimpleSlot get() {
-		final SimpleSlot slot = this.slot;
-		if (slot != null) {
-			return slot;
-		} else {
-			throw new IllegalStateException("The future is not complete - not slot available");
-		}
-	}
-
-	public void setFutureAction(SlotAllocationFutureAction action) {
-		checkNotNull(action);
-
-		synchronized (monitor) {
-			checkState(this.action == null, "Future already has an action registered.");
-
-			this.action = action;
-
-			if (this.slot != null) {
-				action.slotAllocated(this.slot);
-			}
-		}
-	}
-
-	/**
-	 * Completes the future with a slot.
-	 */
-	public void setSlot(SimpleSlot slot) {
-		checkNotNull(slot);
-
-		synchronized (monitor) {
-			checkState(this.slot == null, "The future has already been assigned a slot.");
-
-			this.slot = slot;
-			monitor.notifyAll();
-
-			if (action != null) {
-				action.slotAllocated(slot);
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return slot == null ? "PENDING" : "DONE";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
deleted file mode 100644
index f9d032f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-/**
- * An action that is invoked once a {@link SlotAllocationFuture} is triggered.
- */
-public interface SlotAllocationFutureAction {
-
-	/**
-	 * This method is called as soon as the SlotAllocationFuture is triggered.
-	 * 
-	 * @param slot The slot that has been allocated.
-	 */
-	void slotAllocated(SimpleSlot slot);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index aa5925f..a58d910 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -29,11 +29,11 @@ import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
@@ -136,10 +136,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
 			when(simpleSlot.getRoot()).thenReturn(rootSlot);
 
-			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean()))
-					.thenReturn(new SlotAllocationFuture(simpleSlot));
-
-			
+			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+			future.complete(simpleSlot);
+			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
 
 			when(rootSlot.getSlotNumber()).thenReturn(0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index c576ce5..104f4ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
@@ -26,7 +27,6 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
@@ -58,8 +58,9 @@ public class ExecutionVertexSchedulingTest {
 			assertTrue(slot.isReleased());
 
 			Scheduler scheduler = mock(Scheduler.class);
-			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
-				.thenReturn(new SlotAllocationFuture(slot));
+			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+			future.complete(slot);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
@@ -88,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
 			slot.releaseSlot();
 			assertTrue(slot.isReleased());
 
-			final SlotAllocationFuture future = new SlotAllocationFuture();
+			final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
 
 			Scheduler scheduler = mock(Scheduler.class);
 			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
@@ -100,7 +101,10 @@ public class ExecutionVertexSchedulingTest {
 			// future has not yet a slot
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
 
-			future.setSlot(slot);
+			future.complete(slot);
+
+			// wait a second for future's future action be executed
+			Thread.sleep(1000);
 
 			// will have failed
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
@@ -122,8 +126,9 @@ public class ExecutionVertexSchedulingTest {
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			Scheduler scheduler = mock(Scheduler.class);
-			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
-				.thenReturn(new SlotAllocationFuture(slot));
+			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+			future.complete(slot);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index d78f551..9c21533 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -194,7 +196,7 @@ public class SchedulerIsolatedTasksTest {
 			final int totalSlots = scheduler.getNumberOfAvailableSlots();
 
 			// all slots we ever got.
-			List<SlotAllocationFuture> allAllocatedSlots = new ArrayList<SlotAllocationFuture>();
+			List<Future<SimpleSlot>> allAllocatedSlots = new ArrayList<>();
 
 			// slots that need to be released
 			final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>();
@@ -202,17 +204,6 @@ public class SchedulerIsolatedTasksTest {
 			// flag to track errors in the concurrent thread
 			final AtomicBoolean errored = new AtomicBoolean(false);
 
-
-			SlotAllocationFutureAction action = new SlotAllocationFutureAction() {
-				@Override
-				public void slotAllocated(SimpleSlot slot) {
-					synchronized (toRelease) {
-						toRelease.add(slot);
-						toRelease.notifyAll();
-					}
-				}
-			};
-
 			// thread to asynchronously release slots
 			Runnable disposer = new Runnable() {
 
@@ -244,8 +235,16 @@ public class SchedulerIsolatedTasksTest {
 			disposeThread.start();
 
 			for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
-				SlotAllocationFuture future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
-				future.setFutureAction(action);
+				Future<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
+				future.thenAcceptAsync(new AcceptFunction<SimpleSlot>() {
+					@Override
+					public void accept(SimpleSlot slot) {
+						synchronized (toRelease) {
+							toRelease.add(slot);
+							toRelease.notifyAll();
+						}
+					}
+				}, TestingUtils.defaultExecutionContext());
 				allAllocatedSlots.add(future);
 			}
 
@@ -254,8 +253,8 @@ public class SchedulerIsolatedTasksTest {
 			assertFalse("The slot releasing thread caused an error.", errored.get());
 
 			List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>();
-			for (SlotAllocationFuture future : allAllocatedSlots) {
-				slotsAfter.add(future.waitTillCompleted());
+			for (Future<SimpleSlot> future : allAllocatedSlots) {
+				slotsAfter.add(future.get());
 			}
 
 			assertEquals("All instances should have available slots.", NUM_INSTANCES,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
deleted file mode 100644
index ea0d2cc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-public class SlotAllocationFutureTest {
-
-	@Test
-	public void testInvalidActions() {
-		try {
-			final SlotAllocationFuture future = new SlotAllocationFuture();
-			
-			SlotAllocationFutureAction action = new SlotAllocationFutureAction() {
-				@Override
-				public void slotAllocated(SimpleSlot slot) {}
-			};
-			
-			future.setFutureAction(action);
-			try {
-				future.setFutureAction(action);
-				fail();
-			} catch (IllegalStateException e) {
-				// expected
-			}
-
-			final Instance instance1 = SchedulerTestUtils.getRandomInstance(1);
-			final Instance instance2 = SchedulerTestUtils.getRandomInstance(1);
-
-			final SimpleSlot slot1 = new SimpleSlot(new JobID(), instance1,
-					instance1.getTaskManagerLocation(), 0, instance1.getActorGateway(), null, null);
-			final SimpleSlot slot2 = new SimpleSlot(new JobID(), instance2,
-					instance2.getTaskManagerLocation(), 0, instance2.getActorGateway(), null, null);
-			
-			future.setSlot(slot1);
-			try {
-				future.setSlot(slot2);
-				fail();
-			} catch (IllegalStateException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void setWithAction() {
-		try {
-			
-			// action before the slot
-			{
-				final AtomicInteger invocations = new AtomicInteger();
-
-				final Instance instance = SchedulerTestUtils.getRandomInstance(1);
-
-				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
-						instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-				
-				SlotAllocationFuture future = new SlotAllocationFuture();
-				
-				future.setFutureAction(new SlotAllocationFutureAction() {
-					@Override
-					public void slotAllocated(SimpleSlot slot) {
-						assertEquals(thisSlot, slot);
-						invocations.incrementAndGet();
-					}
-				});
-				
-				future.setSlot(thisSlot);
-				
-				assertEquals(1, invocations.get());
-			}
-			
-			// slot before action
-			{
-				final AtomicInteger invocations = new AtomicInteger();
-				final Instance instance = SchedulerTestUtils.getRandomInstance(1);
-				
-				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
-						instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-				
-				SlotAllocationFuture future = new SlotAllocationFuture();
-				future.setSlot(thisSlot);
-				
-				future.setFutureAction(new SlotAllocationFutureAction() {
-					@Override
-					public void slotAllocated(SimpleSlot slot) {
-						assertEquals(thisSlot, slot);
-						invocations.incrementAndGet();
-					}
-				});
-				
-				assertEquals(1, invocations.get());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void setSync() {
-		try {
-			// sync before setting the slot
-			{
-				final AtomicInteger invocations = new AtomicInteger();
-				final AtomicBoolean error = new AtomicBoolean();
-
-				final Instance instance = SchedulerTestUtils.getRandomInstance(1);
-
-				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
-						instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-				
-				final SlotAllocationFuture future = new SlotAllocationFuture();
-				
-				
-				Runnable r = new Runnable() {
-					@Override
-					public void run() {
-						try {
-							SimpleSlot syncSlot = future.waitTillCompleted();
-							if (syncSlot == null || syncSlot != thisSlot) {
-								error.set(true);
-								return;
-							}
-							invocations.incrementAndGet();
-						}
-						catch (Throwable t) {
-							error.set(true);
-						}
-					}
-				};
-				
-				Thread syncer = new Thread(r);
-				syncer.start();
-				
-				// wait, and give the sync thread a chance to sync
-				Thread.sleep(10);
-				future.setSlot(thisSlot);
-				
-				syncer.join();
-				
-				assertFalse(error.get());
-				assertEquals(1, invocations.get());
-			}
-			
-			// setting slot before syncing
-			{
-				final Instance instance = SchedulerTestUtils.getRandomInstance(1);
-
-				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, 
-						instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-				final SlotAllocationFuture future = new SlotAllocationFuture();
-
-				future.setSlot(thisSlot);
-				
-				SimpleSlot retrieved = future.waitTillCompleted();
-				
-				assertNotNull(retrieved);
-				assertEquals(thisSlot, retrieved);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}