You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 20:52:38 UTC
flink git commit: [FLINK-7324] [futures] Replace Flink's future with
Java 8's CompletableFuture in SlotPool
Repository: flink
Updated Branches:
refs/heads/master 3b97784ae -> 6d4981a43
[FLINK-7324] [futures] Replace Flink's future with Java 8's CompletableFuture in SlotPool
Address PR comments
This closes #4438.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d4981a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d4981a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d4981a4
Branch: refs/heads/master
Commit: 6d4981a431e1ad28dee3a2143477fa7d2696d5fd
Parents: 3b97784
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 19:35:14 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 22:52:11 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/concurrent/FutureUtils.java | 25 +++++++
.../apache/flink/runtime/instance/SlotPool.java | 70 ++++++++++----------
2 files changed, 59 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 9cdbe1f..8721e52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -344,4 +344,29 @@ public class FutureUtils {
return result;
}
+
+ /**
+ * Converts a Java 8 {@link java.util.concurrent.CompletableFuture} into a Flink {@link Future}.
+ *
+ * @param javaFuture to convert to a Flink future
+ * @param <T> type of the future value
+ * @return Flink future
+ *
+ * @deprecated Will be removed once we completely remove Flink's futures
+ */
+ @Deprecated
+ public static <T> Future<T> toFlinkFuture(java.util.concurrent.CompletableFuture<T> javaFuture) {
+ FlinkCompletableFuture<T> result = new FlinkCompletableFuture<>();
+
+ javaFuture.whenComplete(
+ (value, throwable) -> {
+ if (throwable == null) {
+ result.complete(value);
+ } else {
+ result.completeExceptionally(throwable);
+ }
+ });
+
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 8cf6a9b..c74d9a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,10 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -57,6 +55,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -246,7 +245,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
// work on all slots waiting for this connection
for (PendingRequest pending : waitingForResourceManager.values()) {
- requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile());
+ requestSlotFromResourceManager(pending.allocationID(), pending.getFuture(), pending.resourceProfile());
}
// all sent off
@@ -269,7 +268,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
ResourceProfile resources,
Iterable<TaskManagerLocation> locationPreferences) {
- return internalAllocateSlot(task, resources, locationPreferences);
+ return FutureUtils.toFlinkFuture(internalAllocateSlot(task, resources, locationPreferences));
}
@RpcMethod
@@ -278,7 +277,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
}
- Future<SimpleSlot> internalAllocateSlot(
+ CompletableFuture<SimpleSlot> internalAllocateSlot(
ScheduledUnit task,
ResourceProfile resources,
Iterable<TaskManagerLocation> locationPreferences) {
@@ -288,12 +287,12 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
if (slotFromPool != null) {
SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality());
allocatedSlots.add(slot);
- return FlinkCompletableFuture.completed(slot);
+ return CompletableFuture.completedFuture(slot);
}
// the request will be completed by a future
final AllocationID allocationID = new AllocationID();
- final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
// (2) need to request a slot
@@ -310,34 +309,33 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
private void requestSlotFromResourceManager(
final AllocationID allocationID,
- final FlinkCompletableFuture<SimpleSlot> future,
+ final CompletableFuture<SimpleSlot> future,
final ResourceProfile resources) {
LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID);
pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
- Future<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
+ CompletableFuture<Acknowledge> rmResponse = FutureUtils.toJava(
+ resourceManagerGateway.requestSlot(
jobManagerLeaderId, resourceManagerLeaderId,
new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
- resourceManagerRequestsTimeout);
+ resourceManagerRequestsTimeout));
- Future<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(new AcceptFunction<Acknowledge>() {
- @Override
- public void accept(Acknowledge value) {
+ CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
+ (Acknowledge value) -> {
slotRequestToResourceManagerSuccess(allocationID);
- }
- }, getMainThreadExecutor());
+ },
+ getMainThreadExecutor());
// on failure, fail the request future
- slotRequestProcessingFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-
- @Override
- public Void apply(Throwable failure) {
- slotRequestToResourceManagerFailed(allocationID, failure);
- return null;
- }
- }, getMainThreadExecutor());
+ slotRequestProcessingFuture.whenCompleteAsync(
+ (Void v, Throwable failure) -> {
+ if (failure != null) {
+ slotRequestToResourceManagerFailed(allocationID, failure);
+ }
+ },
+ getMainThreadExecutor());
}
private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
@@ -354,7 +352,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable failure) {
PendingRequest request = pendingRequests.remove(allocationID);
if (request != null) {
- request.future().completeExceptionally(new NoResourceAvailableException(
+ request.getFuture().completeExceptionally(new NoResourceAvailableException(
"No pooled slot available and request to ResourceManager for new slot failed", failure));
} else {
if (LOG.isDebugEnabled()) {
@@ -365,15 +363,15 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
private void checkTimeoutSlotAllocation(AllocationID allocationID) {
PendingRequest request = pendingRequests.remove(allocationID);
- if (request != null && !request.future().isDone()) {
- request.future().completeExceptionally(new TimeoutException("Slot allocation request timed out"));
+ if (request != null && !request.getFuture().isDone()) {
+ request.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out"));
}
}
private void stashRequestWaitingForResourceManager(
final AllocationID allocationID,
final ResourceProfile resources,
- final FlinkCompletableFuture<SimpleSlot> future) {
+ final CompletableFuture<SimpleSlot> future) {
LOG.info("Cannot serve slot request, no ResourceManager connected. " +
"Adding as pending request {}", allocationID);
@@ -390,8 +388,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
PendingRequest request = waitingForResourceManager.remove(allocationID);
- if (request != null && !request.future().isDone()) {
- request.future().completeExceptionally(new NoResourceAvailableException(
+ if (request != null && !request.getFuture().isDone()) {
+ request.getFuture().completeExceptionally(new NoResourceAvailableException(
"No slot available and no connection to Resource Manager established."));
}
}
@@ -426,7 +424,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
allocatedSlots.add(newSlot);
- pendingRequest.future().complete(newSlot);
+ pendingRequest.getFuture().complete(newSlot);
}
else {
LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
@@ -513,7 +511,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
if (pendingRequest != null) {
// we were waiting for this!
SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN);
- pendingRequest.future().complete(resultSlot);
+ pendingRequest.getFuture().complete(resultSlot);
allocatedSlots.add(resultSlot);
}
else {
@@ -552,7 +550,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
if (pendingRequest != null) {
// request was still pending
LOG.debug("Failed pending request [{}] with ", allocationID, cause);
- pendingRequest.future().completeExceptionally(cause);
+ pendingRequest.getFuture().completeExceptionally(cause);
}
else if (availableSlots.tryRemove(allocationID)) {
LOG.debug("Failed available slot [{}] with ", allocationID, cause);
@@ -999,13 +997,13 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
private final AllocationID allocationID;
- private final FlinkCompletableFuture<SimpleSlot> future;
+ private final CompletableFuture<SimpleSlot> future;
private final ResourceProfile resourceProfile;
PendingRequest(
AllocationID allocationID,
- FlinkCompletableFuture<SimpleSlot> future,
+ CompletableFuture<SimpleSlot> future,
ResourceProfile resourceProfile) {
this.allocationID = allocationID;
this.future = future;
@@ -1016,7 +1014,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
return allocationID;
}
- public FlinkCompletableFuture<SimpleSlot> future() {
+ public CompletableFuture<SimpleSlot> getFuture() {
return future;
}