You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/02 12:29:14 UTC
flink git commit: [FLINK-7320] [futures] Replace Flink's futures with
Java 8's CompletableFuture in Scheduler
Repository: flink
Updated Branches:
refs/heads/master 9de270cb4 -> 32bc67e6c
[FLINK-7320] [futures] Replace Flink's futures with Java 8's CompletableFuture in Scheduler
Address PR comments
This closes #4435.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32bc67e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32bc67e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32bc67e6
Branch: refs/heads/master
Commit: 32bc67e6c64d3c0c6b49523f799a8b1fbb2dd80c
Parents: 9de270c
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 18:37:00 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Aug 2 11:18:19 2017 +0200
----------------------------------------------------------------------
.../webmonitor/StackTraceSampleCoordinator.java | 9 ++--
.../checkpoint/CheckpointCoordinator.java | 9 ++--
.../flink/runtime/concurrent/FutureUtils.java | 8 +---
.../flink/runtime/executiongraph/Execution.java | 7 +--
.../apache/flink/runtime/instance/SlotPool.java | 4 +-
.../flink/runtime/instance/SlotProvider.java | 7 +--
.../runtime/jobmanager/scheduler/Scheduler.java | 19 ++++----
.../ExecutionGraphMetricsTest.java | 3 +-
.../ExecutionGraphSchedulingTest.java | 49 ++++++++------------
.../ExecutionVertexSchedulingTest.java | 9 ++--
.../executiongraph/ProgrammedSlotProvider.java | 18 +++----
.../utils/SimpleSlotProvider.java | 10 ++--
.../scheduler/SchedulerIsolatedTasksTest.java | 18 ++++---
13 files changed, 74 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index df15b48..26e8a93 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -125,18 +126,14 @@ public class StackTraceSampleCoordinator {
executions[i] = execution;
triggerIds[i] = execution.getAttemptId();
} else {
- CompletableFuture<StackTraceSample> result = new CompletableFuture();
- result.completeExceptionally(new IllegalStateException("Task " + tasksToSample[i]
+ return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasksToSample[i]
.getTaskNameWithSubtaskIndex() + " is not running."));
- return result;
}
}
synchronized (lock) {
if (isShutDown) {
- CompletableFuture<StackTraceSample> result = new CompletableFuture();
- result.completeExceptionally(new IllegalStateException("Shut down"));
- return result;
+ return FutureUtils.completedExceptionally(new IllegalStateException("Shut down"));
}
final int sampleId = sampleIdCounter++;
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5cab7f8..6f41867 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -381,9 +382,7 @@ public class CheckpointCoordinator {
result = triggerResult.getPendingCheckpoint().getCompletionFuture();
} else {
Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
- result = new CompletableFuture<>();
- result.completeExceptionally(cause);
- return result;
+ return FutureUtils.completedExceptionally(cause);
}
// Make sure to remove the created base directory on Exceptions
@@ -439,9 +438,7 @@ public class CheckpointCoordinator {
return triggerResult.getPendingCheckpoint().getCompletionFuture();
} else {
Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message());
- CompletableFuture<CompletedCheckpoint> failedResult = new CompletableFuture<>();
- failedResult.completeExceptionally(cause);
- return failedResult;
+ return FutureUtils.completedExceptionally(cause);
}
default:
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 70550ad..cf218c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -60,9 +60,7 @@ public class FutureUtils {
try {
operationResultFuture = operation.call();
} catch (Exception e) {
- java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
- exceptionResult.completeExceptionally(new RetryException("Could not execute the provided operation.", e));
- return exceptionResult;
+ return FutureUtils.completedExceptionally(new RetryException("Could not execute the provided operation.", e));
}
return operationResultFuture.handleAsync(
@@ -71,10 +69,8 @@ public class FutureUtils {
if (retries > 0) {
return retry(operation, retries - 1, executor);
} else {
- java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
- exceptionResult.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+ return FutureUtils.<T>completedExceptionally(new RetryException("Could not complete the operation. Number of retries " +
"has been exhausted.", throwable));
- return exceptionResult;
}
} else {
return java.util.concurrent.CompletableFuture.completedFuture(t);
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 66dee0a..5cb12ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -359,7 +359,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup, locationConstraint);
- return FutureUtils.toJava(slotProvider.allocateSlot(toSchedule, queued));
+ return slotProvider.allocateSlot(toSchedule, queued);
}
else {
// call race, already deployed, or already done
@@ -688,10 +688,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
maxStrackTraceDepth,
timeout));
} else {
- CompletableFuture<StackTraceSampleResponse> result = new CompletableFuture<>();
- result.completeExceptionally(new Exception("The execution has no slot assigned."));
-
- return result;
+ return FutureUtils.completedExceptionally(new Exception("The execution has no slot assigned."));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index c74d9a6..9a26779 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -980,11 +980,11 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
}
@Override
- public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+ public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
Iterable<TaskManagerLocation> locationPreferences =
task.getTaskToExecute().getVertex().getPreferredLocations();
- return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
+ return FutureUtils.toJava(gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index 919f6a1..23e6749 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -18,16 +18,17 @@
package org.apache.flink.runtime.instance;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import java.util.concurrent.CompletableFuture;
+
/**
* The slot provider is responsible for preparing slots for ready-to-run tasks.
*
* <p>It supports two allocating modes:
* <ul>
* <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
- * {@link Future#getNow(Object)} to get the allocated slot.</li>
+ * {@link CompletableFuture#getNow(Object)} to get the allocated slot.</li>
* <li>Queued allocating: A request for a task slot is queued and returns a future that will be
* fulfilled as soon as a slot becomes available.</li>
* </ul>
@@ -41,5 +42,5 @@ public interface SlotProvider {
* @param allowQueued Whether allow the task be queued if we do not have enough resource
* @return The future of the allocation
*/
- Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
+ CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index af72d7c..5a7e819 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
@@ -37,9 +38,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -134,16 +133,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
@Override
- public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+ public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
try {
final Object ret = scheduleTask(task, allowQueued);
if (ret instanceof SimpleSlot) {
- return FlinkCompletableFuture.completed((SimpleSlot) ret);
+ return CompletableFuture.completedFuture((SimpleSlot) ret);
}
- else if (ret instanceof Future) {
+ else if (ret instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
- Future<SimpleSlot> typed = (Future<SimpleSlot>) ret;
+ CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
return typed;
}
else {
@@ -152,12 +151,12 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
}
catch (NoResourceAvailableException e) {
- return FlinkCompletableFuture.completedExceptionally(e);
+ return FutureUtils.completedExceptionally(e);
}
}
/**
- * Returns either a {@link SimpleSlot}, or a {@link Future}.
+ * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}.
*/
private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
if (task == null) {
@@ -320,7 +319,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
else {
// no resource available now, so queue the request
if (queueIfNoResource) {
- CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
this.taskQueue.add(new QueuedTask(task, future));
return future;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 0785a26..bfcab87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -55,6 +55,7 @@ import org.mockito.Matchers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -113,7 +114,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
when(simpleSlot.getRoot()).thenReturn(rootSlot);
when(simpleSlot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
- FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
future.complete(simpleSlot);
when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index c2eea5c..d3086a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -52,13 +51,13 @@ import org.junit.After;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.mockito.verification.Timeout;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -110,8 +109,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
- final FlinkCompletableFuture<SimpleSlot> sourceFuture = new FlinkCompletableFuture<>();
- final FlinkCompletableFuture<SimpleSlot> targetFuture = new FlinkCompletableFuture<>();
+ final CompletableFuture<SimpleSlot> sourceFuture = new CompletableFuture<>();
+ final CompletableFuture<SimpleSlot> targetFuture = new CompletableFuture<>();
ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
@@ -178,9 +177,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
@SuppressWarnings({"unchecked", "rawtypes"})
- final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+ final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
- final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
+ final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
//
// Create the slots, futures, and the slot provider
@@ -198,8 +197,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
sourceSlots[i] = createSlot(sourceTaskManagers[i], jobId);
targetSlots[i] = createSlot(targetTaskManagers[i], jobId);
- sourceFutures[i] = new FlinkCompletableFuture<>();
- targetFutures[i] = new FlinkCompletableFuture<>();
+ sourceFutures[i] = new CompletableFuture<>();
+ targetFutures[i] = new CompletableFuture<>();
}
ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
@@ -284,16 +283,16 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
- final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+ final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
- final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
+ final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
sourceSlots[i] = createSlot(taskManager, jobId, slotOwner);
targetSlots[i] = createSlot(taskManager, jobId, slotOwner);
- sourceFutures[i] = new FlinkCompletableFuture<>();
- targetFutures[i] = new FlinkCompletableFuture<>();
+ sourceFutures[i] = new CompletableFuture<>();
+ targetFutures[i] = new CompletableFuture<>();
}
ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
@@ -359,11 +358,11 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
final SimpleSlot[] slots = new SimpleSlot[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
- final FlinkCompletableFuture<SimpleSlot>[] slotFutures = new FlinkCompletableFuture[parallelism];
+ final CompletableFuture<SimpleSlot>[] slotFutures = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
slots[i] = createSlot(taskManager, jobId, slotOwner);
- slotFutures[i] = new FlinkCompletableFuture<>();
+ slotFutures[i] = new CompletableFuture<>();
}
ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
@@ -393,7 +392,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
// verify that no deployments have happened
verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
- for (Future<SimpleSlot> future : slotFutures) {
+ for (CompletableFuture<SimpleSlot> future : slotFutures) {
if (future.isDone()) {
assertTrue(future.get().isCanceled());
}
@@ -435,17 +434,13 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
createSlot(taskManager, jobId, recycler)));
when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
- new Answer<Future<SimpleSlot>>() {
-
- @Override
- public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+ (InvocationOnMock invocation) -> {
if (availableSlots.isEmpty()) {
throw new TestRuntimeException();
} else {
- return FlinkCompletableFuture.completed(availableSlots.remove(0));
+ return CompletableFuture.completedFuture(availableSlots.remove(0));
}
- }
- });
+ });
final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
@@ -513,17 +508,13 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final SlotProvider slotProvider = mock(SlotProvider.class);
when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
- new Answer<Future<SimpleSlot>>() {
-
- @Override
- public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+ (InvocationOnMock invocation) -> {
if (availableSlots.isEmpty()) {
throw new TestRuntimeException();
} else {
- return FlinkCompletableFuture.completed(availableSlots.remove(0));
+ return CompletableFuture.completedFuture(availableSlots.remove(0));
}
- }
- });
+ });
final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 1b029e8..4eac4aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.Instance;
@@ -33,6 +32,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import org.mockito.Matchers;
+import java.util.concurrent.CompletableFuture;
+
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
import static org.junit.Assert.assertEquals;
@@ -59,7 +60,7 @@ public class ExecutionVertexSchedulingTest {
assertTrue(slot.isReleased());
Scheduler scheduler = mock(Scheduler.class);
- FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
future.complete(slot);
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
@@ -90,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
slot.releaseSlot();
assertTrue(slot.isReleased());
- final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
Scheduler scheduler = mock(Scheduler.class);
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
@@ -125,7 +126,7 @@ public class ExecutionVertexSchedulingTest {
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
Scheduler scheduler = mock(Scheduler.class);
- FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
future.complete(slot);
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index 3acb2eb..fef6aaa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
class ProgrammedSlotProvider implements SlotProvider {
- private final Map<JobVertexID, Future<SimpleSlot>[]> slotFutures = new HashMap<>();
+ private final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>();
private final int parallelism;
@@ -45,15 +45,15 @@ class ProgrammedSlotProvider implements SlotProvider {
this.parallelism = parallelism;
}
- public void addSlot(JobVertexID vertex, int subtaskIndex, Future<SimpleSlot> future) {
+ public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture<SimpleSlot> future) {
checkNotNull(vertex);
checkNotNull(future);
checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
- Future<SimpleSlot>[] futures = slotFutures.get(vertex);
+ CompletableFuture<SimpleSlot>[] futures = slotFutures.get(vertex);
if (futures == null) {
@SuppressWarnings("unchecked")
- Future<SimpleSlot>[] newArray = (Future<SimpleSlot>[]) new Future<?>[parallelism];
+ CompletableFuture<SimpleSlot>[] newArray = (CompletableFuture<SimpleSlot>[]) new CompletableFuture<?>[parallelism];
futures = newArray;
slotFutures.put(vertex, futures);
}
@@ -61,7 +61,7 @@ class ProgrammedSlotProvider implements SlotProvider {
futures[subtaskIndex] = future;
}
- public void addSlots(JobVertexID vertex, Future<SimpleSlot>[] futures) {
+ public void addSlots(JobVertexID vertex, CompletableFuture<SimpleSlot>[] futures) {
checkNotNull(vertex);
checkNotNull(futures);
checkArgument(futures.length == parallelism);
@@ -70,13 +70,13 @@ class ProgrammedSlotProvider implements SlotProvider {
}
@Override
- public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+ public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
- Future<SimpleSlot>[] forTask = slotFutures.get(vertexId);
+ CompletableFuture<SimpleSlot>[] forTask = slotFutures.get(vertexId);
if (forTask != null) {
- Future<SimpleSlot> future = forTask[subtask];
+ CompletableFuture<SimpleSlot> future = forTask[subtask];
if (future != null) {
return future;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 2cf1eec..be5282a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -22,8 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -36,6 +35,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.net.InetAddress;
import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,7 +70,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
}
@Override
- public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+ public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
final AllocatedSlot slot;
synchronized (slots) {
@@ -83,10 +83,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
if (slot != null) {
SimpleSlot result = new SimpleSlot(slot, this, 0);
- return FlinkCompletableFuture.completed(result);
+ return CompletableFuture.completedFuture(result);
}
else {
- return FlinkCompletableFuture.completedExceptionally(new NoResourceAvailableException());
+ return FutureUtils.completedExceptionally(new NoResourceAvailableException());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 643efae..a05c1a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.jobmanager.scheduler;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -31,6 +29,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -197,7 +196,7 @@ public class SchedulerIsolatedTasksTest {
final int totalSlots = scheduler.getNumberOfAvailableSlots();
// all slots we ever got.
- List<Future<SimpleSlot>> allAllocatedSlots = new ArrayList<>();
+ List<CompletableFuture<SimpleSlot>> allAllocatedSlots = new ArrayList<>();
// slots that need to be released
final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>();
@@ -236,16 +235,15 @@ public class SchedulerIsolatedTasksTest {
disposeThread.start();
for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
- Future<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
- future.thenAcceptAsync(new AcceptFunction<SimpleSlot>() {
- @Override
- public void accept(SimpleSlot slot) {
+ CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
+ future.thenAcceptAsync(
+ (SimpleSlot slot) -> {
synchronized (toRelease) {
toRelease.add(slot);
toRelease.notifyAll();
}
- }
- }, TestingUtils.defaultExecutionContext());
+ },
+ TestingUtils.defaultExecutionContext());
allAllocatedSlots.add(future);
}
@@ -254,7 +252,7 @@ public class SchedulerIsolatedTasksTest {
assertFalse("The slot releasing thread caused an error.", errored.get());
List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>();
- for (Future<SimpleSlot> future : allAllocatedSlots) {
+ for (CompletableFuture<SimpleSlot> future : allAllocatedSlots) {
slotsAfter.add(future.get());
}