You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 20:52:38 UTC

flink git commit: [FLINK-7324] [futures] Replace Flink's future with Java 8's CompletableFuture in SlotPool

Repository: flink
Updated Branches:
  refs/heads/master 3b97784ae -> 6d4981a43


[FLINK-7324] [futures] Replace Flink's future with Java 8's CompletableFuture in SlotPool

Address PR comments

This closes #4438.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d4981a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d4981a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d4981a4

Branch: refs/heads/master
Commit: 6d4981a431e1ad28dee3a2143477fa7d2696d5fd
Parents: 3b97784
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 19:35:14 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 22:52:11 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 25 +++++++
 .../apache/flink/runtime/instance/SlotPool.java | 70 ++++++++++----------
 2 files changed, 59 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 9cdbe1f..8721e52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -344,4 +344,29 @@ public class FutureUtils {
 
 		return result;
 	}
+
+	/**
+	 * Converts a Java 8 {@link java.util.concurrent.CompletableFuture} into a Flink {@link Future}.
+	 *
+	 * @param javaFuture to convert to a Flink future
+	 * @param <T> type of the future value
+	 * @return Flink future
+	 *
+	 * @deprecated Will be removed once we completely remove Flink's futures
+	 */
+	@Deprecated
+	public static <T> Future<T> toFlinkFuture(java.util.concurrent.CompletableFuture<T> javaFuture) {
+		FlinkCompletableFuture<T> result = new FlinkCompletableFuture<>();
+
+		javaFuture.whenComplete(
+			(value, throwable) -> {
+				if (throwable == null) {
+					result.complete(value);
+				} else {
+					result.completeExceptionally(throwable);
+				}
+			});
+
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 8cf6a9b..c74d9a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,10 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -57,6 +55,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -246,7 +245,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 		// work on all slots waiting for this connection
 		for (PendingRequest pending : waitingForResourceManager.values()) {
-			requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile());
+			requestSlotFromResourceManager(pending.allocationID(), pending.getFuture(), pending.resourceProfile());
 		}
 
 		// all sent off
@@ -269,7 +268,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 			ResourceProfile resources,
 			Iterable<TaskManagerLocation> locationPreferences) {
 
-		return internalAllocateSlot(task, resources, locationPreferences);
+		return FutureUtils.toFlinkFuture(internalAllocateSlot(task, resources, locationPreferences));
 	}
 
 	@RpcMethod
@@ -278,7 +277,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	}
 
 
-	Future<SimpleSlot> internalAllocateSlot(
+	CompletableFuture<SimpleSlot> internalAllocateSlot(
 			ScheduledUnit task,
 			ResourceProfile resources,
 			Iterable<TaskManagerLocation> locationPreferences) {
@@ -288,12 +287,12 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		if (slotFromPool != null) {
 			SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality());
 			allocatedSlots.add(slot);
-			return FlinkCompletableFuture.completed(slot);
+			return CompletableFuture.completedFuture(slot);
 		}
 
 		// the request will be completed by a future
 		final AllocationID allocationID = new AllocationID();
-		final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+		final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 
 		// (2) need to request a slot
 
@@ -310,34 +309,33 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 	private void requestSlotFromResourceManager(
 			final AllocationID allocationID,
-			final FlinkCompletableFuture<SimpleSlot> future,
+			final CompletableFuture<SimpleSlot> future,
 			final ResourceProfile resources) {
 
 		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID);
 
 		pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
 
-		Future<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
+		CompletableFuture<Acknowledge> rmResponse = FutureUtils.toJava(
+			resourceManagerGateway.requestSlot(
 				jobManagerLeaderId, resourceManagerLeaderId,
 				new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
-				resourceManagerRequestsTimeout);
+				resourceManagerRequestsTimeout));
 
-		Future<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(new AcceptFunction<Acknowledge>() {
-			@Override
-			public void accept(Acknowledge value) {
+		CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
+			(Acknowledge value) -> {
 				slotRequestToResourceManagerSuccess(allocationID);
-			}
-		}, getMainThreadExecutor());
+			},
+			getMainThreadExecutor());
 
 		// on failure, fail the request future
-		slotRequestProcessingFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-
-			@Override
-			public Void apply(Throwable failure) {
-				slotRequestToResourceManagerFailed(allocationID, failure);
-				return null;
-			}
-		}, getMainThreadExecutor());
+		slotRequestProcessingFuture.whenCompleteAsync(
+			(Void v, Throwable failure) -> {
+				if (failure != null) {
+					slotRequestToResourceManagerFailed(allocationID, failure);
+				}
+			},
+			getMainThreadExecutor());
 	}
 
 	private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
@@ -354,7 +352,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable failure) {
 		PendingRequest request = pendingRequests.remove(allocationID);
 		if (request != null) {
-			request.future().completeExceptionally(new NoResourceAvailableException(
+			request.getFuture().completeExceptionally(new NoResourceAvailableException(
 					"No pooled slot available and request to ResourceManager for new slot failed", failure));
 		} else {
 			if (LOG.isDebugEnabled()) {
@@ -365,15 +363,15 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 	private void checkTimeoutSlotAllocation(AllocationID allocationID) {
 		PendingRequest request = pendingRequests.remove(allocationID);
-		if (request != null && !request.future().isDone()) {
-			request.future().completeExceptionally(new TimeoutException("Slot allocation request timed out"));
+		if (request != null && !request.getFuture().isDone()) {
+			request.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out"));
 		}
 	}
 
 	private void stashRequestWaitingForResourceManager(
 			final AllocationID allocationID,
 			final ResourceProfile resources,
-			final FlinkCompletableFuture<SimpleSlot> future) {
+			final CompletableFuture<SimpleSlot> future) {
 
 		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
 				"Adding as pending request {}",  allocationID);
@@ -390,8 +388,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 	private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
 		PendingRequest request = waitingForResourceManager.remove(allocationID);
-		if (request != null && !request.future().isDone()) {
-			request.future().completeExceptionally(new NoResourceAvailableException(
+		if (request != null && !request.getFuture().isDone()) {
+			request.getFuture().completeExceptionally(new NoResourceAvailableException(
 					"No slot available and no connection to Resource Manager established."));
 		}
 	}
@@ -426,7 +424,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 					SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
 					allocatedSlots.add(newSlot);
-					pendingRequest.future().complete(newSlot);
+					pendingRequest.getFuture().complete(newSlot);
 				}
 				else {
 					LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
@@ -513,7 +511,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		if (pendingRequest != null) {
 			// we were waiting for this!
 			SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN);
-			pendingRequest.future().complete(resultSlot);
+			pendingRequest.getFuture().complete(resultSlot);
 			allocatedSlots.add(resultSlot);
 		}
 		else {
@@ -552,7 +550,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		if (pendingRequest != null) {
 			// request was still pending
 			LOG.debug("Failed pending request [{}] with ", allocationID, cause);
-			pendingRequest.future().completeExceptionally(cause);
+			pendingRequest.getFuture().completeExceptionally(cause);
 		}
 		else if (availableSlots.tryRemove(allocationID)) {
 			LOG.debug("Failed available slot [{}] with ", allocationID, cause);
@@ -999,13 +997,13 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 		private final AllocationID allocationID;
 
-		private final FlinkCompletableFuture<SimpleSlot> future;
+		private final CompletableFuture<SimpleSlot> future;
 
 		private final ResourceProfile resourceProfile;
 
 		PendingRequest(
 				AllocationID allocationID,
-				FlinkCompletableFuture<SimpleSlot> future,
+				CompletableFuture<SimpleSlot> future,
 				ResourceProfile resourceProfile) {
 			this.allocationID = allocationID;
 			this.future = future;
@@ -1016,7 +1014,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 			return allocationID;
 		}
 
-		public FlinkCompletableFuture<SimpleSlot> future() {
+		public CompletableFuture<SimpleSlot> getFuture() {
 			return future;
 		}