You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 21:18:08 UTC
flink git commit: [FLINK-7332] [futures] Replace Flink's futures with
Java 8's CompletableFuture in TaskExecutor
Repository: flink
Updated Branches:
refs/heads/master a6c8953eb -> d2a8e3741
[FLINK-7332] [futures] Replace Flink's futures with Java 8's CompletableFuture in TaskExecutor
This closes #4448.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2a8e374
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2a8e374
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2a8e374
Branch: refs/heads/master
Commit: d2a8e37415eb34ca9cb8b2d8c22a33aa99b494a6
Parents: a6c8953
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 1 10:46:40 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 23:13:56 2017 +0200
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutor.java | 100 +++++++++----------
1 file changed, 46 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d2a8e374/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4c4b0a7..aa4d6d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -27,9 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -771,55 +768,51 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
reservedSlots.add(offer);
}
- Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
- getResourceID(),
- reservedSlots,
- leaderId,
- taskManagerConfiguration.getTimeout());
-
- Future<Void> acceptedSlotsAcceptFuture = acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() {
- @Override
- public void accept(Iterable<SlotOffer> acceptedSlots) {
- // check if the response is still valid
- if (isJobManagerConnectionValid(jobId, leaderId)) {
- // mark accepted slots active
- for (SlotOffer acceptedSlot : acceptedSlots) {
- reservedSlots.remove(acceptedSlot);
- }
-
- final Exception e = new Exception("The slot was rejected by the JobManager.");
+ CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = FutureUtils.toJava(
+ jobMasterGateway.offerSlots(
+ getResourceID(),
+ reservedSlots,
+ leaderId,
+ taskManagerConfiguration.getTimeout()));
+
+ acceptedSlotsFuture.whenCompleteAsync(
+ (Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof TimeoutException) {
+ log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
+ // We ran into a timeout. Try again.
+ offerSlotsToJobManager(jobId);
+ } else {
+ log.warn("Slot offering to JobManager failed. Freeing the slots " +
+ "and returning them to the ResourceManager.", throwable);
- for (SlotOffer rejectedSlot: reservedSlots) {
- freeSlot(rejectedSlot.getAllocationId(), e);
+ // We encountered an exception. Free the slots and return them to the RM.
+ for (SlotOffer reservedSlot: reservedSlots) {
+ freeSlot(reservedSlot.getAllocationId(), throwable);
+ }
}
} else {
- // discard the response since there is a new leader for the job
- log.debug("Discard offer slot response since there is a new leader " +
- "for the job {}.", jobId);
- }
- }
- }, getMainThreadExecutor());
+ // check if the response is still valid
+ if (isJobManagerConnectionValid(jobId, leaderId)) {
+ // mark accepted slots active
+ for (SlotOffer acceptedSlot : acceptedSlots) {
+ reservedSlots.remove(acceptedSlot);
+ }
- acceptedSlotsAcceptFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable throwable) {
- if (throwable instanceof TimeoutException) {
- log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
- // We ran into a timeout. Try again.
- offerSlotsToJobManager(jobId);
- } else {
- log.warn("Slot offering to JobManager failed. Freeing the slots " +
- "and returning them to the ResourceManager.", throwable);
+ final Exception e = new Exception("The slot was rejected by the JobManager.");
- // We encountered an exception. Free the slots and return them to the RM.
- for (SlotOffer reservedSlot: reservedSlots) {
- freeSlot(reservedSlot.getAllocationId(), throwable);
+ for (SlotOffer rejectedSlot : reservedSlots) {
+ freeSlot(rejectedSlot.getAllocationId(), e);
+ }
+ } else {
+ // discard the response since there is a new leader for the job
+ log.debug("Discard offer slot response since there is a new leader " +
+ "for the job {}.", jobId);
}
}
+ },
+ getMainThreadExecutor());
- return null;
- }
- });
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
@@ -992,17 +985,16 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
{
final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
- Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(
- jobMasterLeaderId, taskExecutionState);
+ CompletableFuture<Acknowledge> futureAcknowledge = FutureUtils.toJava(
+ jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState));
- futureAcknowledge.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable value) {
- failTask(executionAttemptID, value);
-
- return null;
- }
- }, getMainThreadExecutor());
+ futureAcknowledge.whenCompleteAsync(
+ (ack, throwable) -> {
+ if (throwable != null) {
+ failTask(executionAttemptID, throwable);
+ }
+ },
+ getMainThreadExecutor());
}
private void unregisterTaskAndNotifyFinalState(