You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 21:18:08 UTC

flink git commit: [FLINK-7332] [futures] Replace Flink's futures with Java 8's CompletableFuture in TaskExecutor

Repository: flink
Updated Branches:
  refs/heads/master a6c8953eb -> d2a8e3741


[FLINK-7332] [futures] Replace Flink's futures with Java 8's CompletableFuture in TaskExecutor

This closes #4448.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2a8e374
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2a8e374
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2a8e374

Branch: refs/heads/master
Commit: d2a8e37415eb34ca9cb8b2d8c22a33aa99b494a6
Parents: a6c8953
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 1 10:46:40 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 23:13:56 2017 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      | 100 +++++++++----------
 1 file changed, 46 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d2a8e374/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4c4b0a7..aa4d6d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -27,9 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -771,55 +768,51 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 					reservedSlots.add(offer);
 				}
 
-				Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
-					getResourceID(),
-					reservedSlots,
-					leaderId,
-					taskManagerConfiguration.getTimeout());
-
-				Future<Void> acceptedSlotsAcceptFuture = acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() {
-					@Override
-					public void accept(Iterable<SlotOffer> acceptedSlots) {
-						// check if the response is still valid
-						if (isJobManagerConnectionValid(jobId, leaderId)) {
-							// mark accepted slots active
-							for (SlotOffer acceptedSlot : acceptedSlots) {
-								reservedSlots.remove(acceptedSlot);
-							}
-
-							final Exception e = new Exception("The slot was rejected by the JobManager.");
+				CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = FutureUtils.toJava(
+					jobMasterGateway.offerSlots(
+						getResourceID(),
+						reservedSlots,
+						leaderId,
+						taskManagerConfiguration.getTimeout()));
+
+				acceptedSlotsFuture.whenCompleteAsync(
+					(Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
+						if (throwable != null) {
+							if (throwable instanceof TimeoutException) {
+								log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
+								// We ran into a timeout. Try again.
+								offerSlotsToJobManager(jobId);
+							} else {
+								log.warn("Slot offering to JobManager failed. Freeing the slots " +
+									"and returning them to the ResourceManager.", throwable);
 
-							for (SlotOffer rejectedSlot: reservedSlots) {
-								freeSlot(rejectedSlot.getAllocationId(), e);
+								// We encountered an exception. Free the slots and return them to the RM.
+								for (SlotOffer reservedSlot: reservedSlots) {
+									freeSlot(reservedSlot.getAllocationId(), throwable);
+								}
 							}
 						} else {
-							// discard the response since there is a new leader for the job
-							log.debug("Discard offer slot response since there is a new leader " +
-								"for the job {}.", jobId);
-						}
-					}
-				}, getMainThreadExecutor());
+							// check if the response is still valid
+							if (isJobManagerConnectionValid(jobId, leaderId)) {
+								// mark accepted slots active
+								for (SlotOffer acceptedSlot : acceptedSlots) {
+									reservedSlots.remove(acceptedSlot);
+								}
 
-				acceptedSlotsAcceptFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
-					@Override
-					public Void apply(Throwable throwable) {
-						if (throwable instanceof TimeoutException) {
-							log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
-							// We ran into a timeout. Try again.
-							offerSlotsToJobManager(jobId);
-						} else {
-							log.warn("Slot offering to JobManager failed. Freeing the slots " +
-									"and returning them to the ResourceManager.", throwable);
+								final Exception e = new Exception("The slot was rejected by the JobManager.");
 
-							// We encountered an exception. Free the slots and return them to the RM.
-							for (SlotOffer reservedSlot: reservedSlots) {
-								freeSlot(reservedSlot.getAllocationId(), throwable);
+								for (SlotOffer rejectedSlot : reservedSlots) {
+									freeSlot(rejectedSlot.getAllocationId(), e);
+								}
+							} else {
+								// discard the response since there is a new leader for the job
+								log.debug("Discard offer slot response since there is a new leader " +
+									"for the job {}.", jobId);
 							}
 						}
+					},
+					getMainThreadExecutor());
 
-						return null;
-					}
-				});
 			} else {
 				log.debug("There are no unassigned slots for the job {}.", jobId);
 			}
@@ -992,17 +985,16 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	{
 		final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
 
-		Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(
-				jobMasterLeaderId, taskExecutionState);
+		CompletableFuture<Acknowledge> futureAcknowledge = FutureUtils.toJava(
+			jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState));
 
-		futureAcknowledge.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-			@Override
-			public Void apply(Throwable value) {
-				failTask(executionAttemptID, value);
-
-				return null;
-			}
-		}, getMainThreadExecutor());
+		futureAcknowledge.whenCompleteAsync(
+			(ack, throwable) -> {
+				if (throwable != null) {
+					failTask(executionAttemptID, throwable);
+				}
+			},
+			getMainThreadExecutor());
 	}
 
 	private void unregisterTaskAndNotifyFinalState(