You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/27 16:40:32 UTC
[2/3] flink git commit: [FLINK-4690] Replace SlotAllocationFuture
with flink's own future
[FLINK-4690] Replace SlotAllocationFuture with flink's own future
This closes #2552.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b88f1a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b88f1a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b88f1a7
Branch: refs/heads/master
Commit: 7b88f1a75ea92f6b26624a7358e7fcafa3e9506f
Parents: f8138f4
Author: Kurt Young <yk...@gmail.com>
Authored: Tue Sep 27 12:10:08 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 18:39:36 2016 +0200
----------------------------------------------------------------------
.../runtime/concurrent/impl/FlinkFuture.java | 1 -
.../flink/runtime/executiongraph/Execution.java | 55 ++---
.../flink/runtime/instance/SlotProvider.java | 6 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 24 ++-
.../scheduler/SlotAllocationFuture.java | 146 --------------
.../scheduler/SlotAllocationFutureAction.java | 34 ----
.../ExecutionGraphMetricsTest.java | 9 +-
.../ExecutionVertexSchedulingTest.java | 19 +-
.../scheduler/SchedulerIsolatedTasksTest.java | 31 ++-
.../scheduler/SlotAllocationFutureTest.java | 200 -------------------
10 files changed, 80 insertions(+), 445 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 361cd3d..3f2c5e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -104,7 +104,6 @@ public class FlinkFuture<T> implements Future<T> {
@Override
public T getNow(T valueIfAbsent) throws ExecutionException {
Preconditions.checkNotNull(scalaFuture);
- Preconditions.checkNotNull(valueIfAbsent);
Option<Try<T>> value = scalaFuture.value();
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6826365..8c02e1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,18 +20,18 @@ package org.apache.flink.runtime.executiongraph;
import akka.dispatch.OnComplete;
import akka.dispatch.OnFailure;
-
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -41,20 +41,18 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
-
import org.slf4j.Logger;
import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
+import scala.concurrent.ExecutionContext$;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
@@ -299,32 +297,43 @@ public class Execution {
// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
// in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
- final SlotAllocationFuture future = slotProvider.allocateSlot(toSchedule, queued);
+ final Future<SimpleSlot> future = slotProvider.allocateSlot(toSchedule, queued);
+
if (queued) {
- future.setFutureAction(new SlotAllocationFutureAction() {
+ future.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() {
@Override
- public void slotAllocated(SimpleSlot slot) {
- try {
- deployToSlot(slot);
- }
- catch (Throwable t) {
+ public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+ if (simpleSlot != null) {
try {
- slot.releaseSlot();
- } finally {
- markFailed(t);
+ deployToSlot(simpleSlot);
+ } catch (Throwable t) {
+ try {
+ simpleSlot.releaseSlot();
+ } finally {
+ markFailed(t);
+ }
}
}
+ else {
+ markFailed(throwable);
+ }
+ return null;
}
- });
+ }, ExecutionContext$.MODULE$.global());
}
else {
- SimpleSlot slot = future.get();
+ SimpleSlot slot = null;
try {
+ // when queued is not allowed, we will get a slot or NoResourceAvailableException will be
+ // thrown earlier (when allocateSlot).
+ slot = checkNotNull(future.getNow(null));
deployToSlot(slot);
}
catch (Throwable t) {
try {
- slot.releaseSlot();
+ if (slot != null) {
+ slot.releaseSlot();
+ }
} finally {
markFailed(t);
}
@@ -394,7 +403,7 @@ public class Execution {
final ActorGateway gateway = slot.getTaskManagerActorGateway();
- final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
+ final scala.concurrent.Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
deployAction.onComplete(new OnComplete<Object>(){
@@ -436,7 +445,7 @@ public class Execution {
if (slot != null) {
final ActorGateway gateway = slot.getTaskManagerActorGateway();
- Future<Object> stopResult = gateway.retry(
+ scala.concurrent.Future<Object> stopResult = gateway.retry(
new StopTask(attemptId),
NUM_STOP_CALL_TRIES,
timeout,
@@ -916,7 +925,7 @@ public class Execution {
final ActorGateway gateway = slot.getTaskManagerActorGateway();
- Future<Object> cancelResult = gateway.retry(
+ scala.concurrent.Future<Object> cancelResult = gateway.retry(
new CancelTask(attemptId),
NUM_CANCEL_CALL_TRIES,
timeout,
@@ -965,7 +974,7 @@ public class Execution {
final ActorGateway gateway = consumerSlot.getTaskManagerActorGateway();
final TaskManagerLocation taskManagerLocation = consumerSlot.getTaskManagerLocation();
- Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
+ scala.concurrent.Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
futureUpdate.onFailure(new OnFailure() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index b2c23a5..49e6d9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.instance;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
/**
* The slot provider is responsible for preparing slots for ready-to-run tasks.
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
* <p>It supports two allocating modes:
* <ul>
* <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
- * {@link SlotAllocationFuture#get()} to get the allocated slot.</li>
+ * {@link Future#getNow(Object)} to get the allocated slot.</li>
* <li>Queued allocating: A request for a task slot is queued and returns a future that will be
* fulfilled as soon as a slot becomes available.</li>
* </ul>
@@ -44,5 +44,5 @@ public interface SlotProvider {
*
* @throws NoResourceAvailableException
*/
- SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException;
+ Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index c9cdd00..ce2f6f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,8 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -133,15 +135,17 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
@Override
- public SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued)
+ public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)
throws NoResourceAvailableException {
final Object ret = scheduleTask(task, allowQueued);
if (ret instanceof SimpleSlot) {
- return new SlotAllocationFuture((SimpleSlot) ret);
+ FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ future.complete((SimpleSlot) ret);
+ return future;
}
- else if (ret instanceof SlotAllocationFuture) {
- return (SlotAllocationFuture) ret;
+ else if (ret instanceof Future) {
+ return (Future) ret;
}
else {
throw new RuntimeException();
@@ -149,7 +153,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
/**
- * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link SlotAllocationFuture}.
+ * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link Future}.
*/
private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
if (task == null) {
@@ -312,7 +316,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
else {
// no resource available now, so queue the request
if (queueIfNoResource) {
- SlotAllocationFuture future = new SlotAllocationFuture();
+ FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
this.taskQueue.add(new QueuedTask(task, future));
return future;
}
@@ -560,7 +564,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
taskQueue.poll();
if (queued.getFuture() != null) {
try {
- queued.getFuture().setSlot(newSlot);
+ queued.getFuture().complete(newSlot);
}
catch (Throwable t) {
LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
@@ -829,10 +833,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
private final ScheduledUnit task;
- private final SlotAllocationFuture future;
+ private final FlinkCompletableFuture<SimpleSlot> future;
- public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
+ public QueuedTask(ScheduledUnit task, FlinkCompletableFuture<SimpleSlot> future) {
this.task = task;
this.future = future;
}
@@ -841,7 +845,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
return task;
}
- public SlotAllocationFuture getFuture() {
+ public FlinkCompletableFuture<SimpleSlot> getFuture() {
return future;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
deleted file mode 100644
index 36e4072..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- *
- */
-public class SlotAllocationFuture {
-
- private final Object monitor = new Object();
-
- private volatile SimpleSlot slot;
-
- private volatile SlotAllocationFutureAction action;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a future that is uncompleted.
- */
- public SlotAllocationFuture() {}
-
- /**
- * Creates a future that is immediately completed.
- *
- * @param slot The task slot that completes the future.
- */
- public SlotAllocationFuture(SimpleSlot slot) {
- this.slot = slot;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public SimpleSlot waitTillCompleted() throws InterruptedException {
- synchronized (monitor) {
- while (slot == null) {
- monitor.wait();
- }
- return slot;
- }
- }
-
- public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
- checkArgument(timeout >= 0, "timeout may not be negative");
- checkNotNull(timeUnit, "timeUnit");
-
- if (timeout == 0) {
- return waitTillCompleted();
- } else {
- final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
- long millisToWait;
-
- synchronized (monitor) {
- while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) {
- monitor.wait(millisToWait);
- }
-
- if (slot != null) {
- return slot;
- } else {
- throw new TimeoutException();
- }
- }
- }
- }
-
- /**
- * Gets the slot from this future. This method throws an exception, if the future has not been completed.
- * This method never blocks.
- *
- * @return The slot with which this future was completed.
- * @throws IllegalStateException Thrown, if this method is called before the future is completed.
- */
- public SimpleSlot get() {
- final SimpleSlot slot = this.slot;
- if (slot != null) {
- return slot;
- } else {
- throw new IllegalStateException("The future is not complete - not slot available");
- }
- }
-
- public void setFutureAction(SlotAllocationFutureAction action) {
- checkNotNull(action);
-
- synchronized (monitor) {
- checkState(this.action == null, "Future already has an action registered.");
-
- this.action = action;
-
- if (this.slot != null) {
- action.slotAllocated(this.slot);
- }
- }
- }
-
- /**
- * Completes the future with a slot.
- */
- public void setSlot(SimpleSlot slot) {
- checkNotNull(slot);
-
- synchronized (monitor) {
- checkState(this.slot == null, "The future has already been assigned a slot.");
-
- this.slot = slot;
- monitor.notifyAll();
-
- if (action != null) {
- action.slotAllocated(slot);
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return slot == null ? "PENDING" : "DONE";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
deleted file mode 100644
index f9d032f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-/**
- * An action that is invoked once a {@link SlotAllocationFuture} is triggered.
- */
-public interface SlotAllocationFutureAction {
-
- /**
- * This method is called as soon as the SlotAllocationFuture is triggered.
- *
- * @param slot The slot that has been allocated.
- */
- void slotAllocated(SimpleSlot slot);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index aa5925f..a58d910 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -29,11 +29,11 @@ import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
@@ -136,10 +136,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
when(simpleSlot.getRoot()).thenReturn(rootSlot);
- when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean()))
- .thenReturn(new SlotAllocationFuture(simpleSlot));
-
-
+ FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ future.complete(simpleSlot);
+ when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
when(rootSlot.getSlotNumber()).thenReturn(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index c576ce5..104f4ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.Instance;
@@ -26,7 +27,6 @@ import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
@@ -58,8 +58,9 @@ public class ExecutionVertexSchedulingTest {
assertTrue(slot.isReleased());
Scheduler scheduler = mock(Scheduler.class);
- when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
- .thenReturn(new SlotAllocationFuture(slot));
+ FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ future.complete(slot);
+ when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
@@ -88,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
slot.releaseSlot();
assertTrue(slot.isReleased());
- final SlotAllocationFuture future = new SlotAllocationFuture();
+ final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
Scheduler scheduler = mock(Scheduler.class);
when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
@@ -100,7 +101,10 @@ public class ExecutionVertexSchedulingTest {
// future has not yet a slot
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
- future.setSlot(slot);
+ future.complete(slot);
+
+ // wait a second for future's future action be executed
+ Thread.sleep(1000);
// will have failed
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
@@ -122,8 +126,9 @@ public class ExecutionVertexSchedulingTest {
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
Scheduler scheduler = mock(Scheduler.class);
- when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
- .thenReturn(new SlotAllocationFuture(slot));
+ FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ future.complete(slot);
+ when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index d78f551..9c21533 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.jobmanager.scheduler;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -194,7 +196,7 @@ public class SchedulerIsolatedTasksTest {
final int totalSlots = scheduler.getNumberOfAvailableSlots();
// all slots we ever got.
- List<SlotAllocationFuture> allAllocatedSlots = new ArrayList<SlotAllocationFuture>();
+ List<Future<SimpleSlot>> allAllocatedSlots = new ArrayList<>();
// slots that need to be released
final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>();
@@ -202,17 +204,6 @@ public class SchedulerIsolatedTasksTest {
// flag to track errors in the concurrent thread
final AtomicBoolean errored = new AtomicBoolean(false);
-
- SlotAllocationFutureAction action = new SlotAllocationFutureAction() {
- @Override
- public void slotAllocated(SimpleSlot slot) {
- synchronized (toRelease) {
- toRelease.add(slot);
- toRelease.notifyAll();
- }
- }
- };
-
// thread to asynchronously release slots
Runnable disposer = new Runnable() {
@@ -244,8 +235,16 @@ public class SchedulerIsolatedTasksTest {
disposeThread.start();
for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
- SlotAllocationFuture future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
- future.setFutureAction(action);
+ Future<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
+ future.thenAcceptAsync(new AcceptFunction<SimpleSlot>() {
+ @Override
+ public void accept(SimpleSlot slot) {
+ synchronized (toRelease) {
+ toRelease.add(slot);
+ toRelease.notifyAll();
+ }
+ }
+ }, TestingUtils.defaultExecutionContext());
allAllocatedSlots.add(future);
}
@@ -254,8 +253,8 @@ public class SchedulerIsolatedTasksTest {
assertFalse("The slot releasing thread caused an error.", errored.get());
List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>();
- for (SlotAllocationFuture future : allAllocatedSlots) {
- slotsAfter.add(future.waitTillCompleted());
+ for (Future<SimpleSlot> future : allAllocatedSlots) {
+ slotsAfter.add(future.get());
}
assertEquals("All instances should have available slots.", NUM_INSTANCES,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
deleted file mode 100644
index ea0d2cc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-public class SlotAllocationFutureTest {
-
- @Test
- public void testInvalidActions() {
- try {
- final SlotAllocationFuture future = new SlotAllocationFuture();
-
- SlotAllocationFutureAction action = new SlotAllocationFutureAction() {
- @Override
- public void slotAllocated(SimpleSlot slot) {}
- };
-
- future.setFutureAction(action);
- try {
- future.setFutureAction(action);
- fail();
- } catch (IllegalStateException e) {
- // expected
- }
-
- final Instance instance1 = SchedulerTestUtils.getRandomInstance(1);
- final Instance instance2 = SchedulerTestUtils.getRandomInstance(1);
-
- final SimpleSlot slot1 = new SimpleSlot(new JobID(), instance1,
- instance1.getTaskManagerLocation(), 0, instance1.getActorGateway(), null, null);
- final SimpleSlot slot2 = new SimpleSlot(new JobID(), instance2,
- instance2.getTaskManagerLocation(), 0, instance2.getActorGateway(), null, null);
-
- future.setSlot(slot1);
- try {
- future.setSlot(slot2);
- fail();
- } catch (IllegalStateException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void setWithAction() {
- try {
-
- // action before the slot
- {
- final AtomicInteger invocations = new AtomicInteger();
-
- final Instance instance = SchedulerTestUtils.getRandomInstance(1);
-
- final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
- instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-
- SlotAllocationFuture future = new SlotAllocationFuture();
-
- future.setFutureAction(new SlotAllocationFutureAction() {
- @Override
- public void slotAllocated(SimpleSlot slot) {
- assertEquals(thisSlot, slot);
- invocations.incrementAndGet();
- }
- });
-
- future.setSlot(thisSlot);
-
- assertEquals(1, invocations.get());
- }
-
- // slot before action
- {
- final AtomicInteger invocations = new AtomicInteger();
- final Instance instance = SchedulerTestUtils.getRandomInstance(1);
-
- final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
- instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-
- SlotAllocationFuture future = new SlotAllocationFuture();
- future.setSlot(thisSlot);
-
- future.setFutureAction(new SlotAllocationFutureAction() {
- @Override
- public void slotAllocated(SimpleSlot slot) {
- assertEquals(thisSlot, slot);
- invocations.incrementAndGet();
- }
- });
-
- assertEquals(1, invocations.get());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void setSync() {
- try {
- // sync before setting the slot
- {
- final AtomicInteger invocations = new AtomicInteger();
- final AtomicBoolean error = new AtomicBoolean();
-
- final Instance instance = SchedulerTestUtils.getRandomInstance(1);
-
- final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
- instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
-
- final SlotAllocationFuture future = new SlotAllocationFuture();
-
-
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try {
- SimpleSlot syncSlot = future.waitTillCompleted();
- if (syncSlot == null || syncSlot != thisSlot) {
- error.set(true);
- return;
- }
- invocations.incrementAndGet();
- }
- catch (Throwable t) {
- error.set(true);
- }
- }
- };
-
- Thread syncer = new Thread(r);
- syncer.start();
-
- // wait, and give the sync thread a chance to sync
- Thread.sleep(10);
- future.setSlot(thisSlot);
-
- syncer.join();
-
- assertFalse(error.get());
- assertEquals(1, invocations.get());
- }
-
- // setting slot before syncing
- {
- final Instance instance = SchedulerTestUtils.getRandomInstance(1);
-
- final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
- instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
- final SlotAllocationFuture future = new SlotAllocationFuture();
-
- future.setSlot(thisSlot);
-
- SimpleSlot retrieved = future.waitTillCompleted();
-
- assertNotNull(retrieved);
- assertEquals(thisSlot, retrieved);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}