You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/12/10 10:47:43 UTC
reef git commit: [REEF-502] Support Vortex Tasklet(s) cancellation by
user
Repository: reef
Updated Branches:
refs/heads/master 766883f0c -> fe6194cac
[REEF-502] Support Vortex Tasklet(s) cancellation by user
JIRA:
[REEF-502](https://issues.apache.org/jira/browse/REEF-502)
Pull Request:
Closes #708
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/fe6194ca
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/fe6194ca
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/fe6194ca
Branch: refs/heads/master
Commit: fe6194cacbfa38714a295790fc302d5c86a01726
Parents: 766883f
Author: Andrew Chung <af...@gmail.com>
Authored: Fri Dec 4 17:38:34 2015 -0800
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Thu Dec 10 18:46:11 2015 +0900
----------------------------------------------------------------------
.../src/main/avro/VortexRequest.avsc | 18 +++-
.../reef-vortex/src/main/avro/WorkerReport.avsc | 22 ++--
.../apache/reef/vortex/api/VortexFuture.java | 101 +++++++++++++++++--
.../common/TaskletCancellationRequest.java | 43 ++++++++
.../vortex/common/TaskletCancelledReport.java | 46 +++++++++
.../vortex/common/TaskletExecutionRequest.java | 1 +
.../vortex/common/TaskletFailureReport.java | 1 +
.../reef/vortex/common/TaskletResultReport.java | 1 +
.../reef/vortex/common/VortexAvroUtils.java | 45 +++++++--
.../reef/vortex/common/VortexRequest.java | 8 +-
.../apache/reef/vortex/common/WorkerReport.java | 6 ++
.../reef/vortex/driver/DefaultVortexMaster.java | 24 ++++-
.../vortex/driver/FirstFitSchedulingPolicy.java | 6 ++
.../reef/vortex/driver/PendingTasklets.java | 2 +-
.../vortex/driver/RandomSchedulingPolicy.java | 8 ++
.../reef/vortex/driver/RunningWorkers.java | 86 ++++++++++++++--
.../reef/vortex/driver/SchedulingPolicy.java | 5 +
.../org/apache/reef/vortex/driver/Tasklet.java | 7 ++
.../apache/reef/vortex/driver/VortexDriver.java | 5 +
.../apache/reef/vortex/driver/VortexMaster.java | 13 +++
.../reef/vortex/driver/VortexWorkerManager.java | 13 +++
.../reef/vortex/evaluator/VortexWorker.java | 93 +++++++++++------
.../vortex/driver/DefaultVortexMasterTest.java | 52 ++++++++++
.../org/apache/reef/vortex/driver/TestUtil.java | 42 +++++++-
.../applications/vortex/VortexTestSuite.java | 4 +-
.../InfiniteLoopWithCancellationFunction.java | 44 ++++++++
.../cancellation/TaskletCancellationTest.java | 62 ++++++++++++
.../TaskletCancellationTestStart.java | 72 +++++++++++++
.../vortex/cancellation/package-info.java | 23 +++++
29 files changed, 780 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
index 2453761..bf4396e 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
@@ -30,10 +30,24 @@
{
"namespace": "org.apache.reef.vortex.common.avro",
"type": "record",
+ "name": "AvroTaskletCancellationRequest",
+ "fields": [{"name": "taskletId", "type": "int"}]
+ },
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
"name": "AvroVortexRequest",
"fields": [
- {"name": "requestType", "type": {"type": "enum", "name": "AvroRequestType", "symbols": ["ExecuteTasklet"]}},
- {"name": "taskletExecutionRequest", "type": ["null", "AvroTaskletExecutionRequest"], "default": null}
+ {
+ "name": "requestType",
+ "type": {"type": "enum", "name": "AvroRequestType",
+ "symbols": ["ExecuteTasklet", "CancelTasklet"]}
+ },
+ {
+ "name": "taskletRequest",
+ "type": ["null", "AvroTaskletExecutionRequest", "AvroTaskletCancellationRequest"],
+ "default": null
+ }
]
}
]
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
index 19a655d..11adb12 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -29,6 +29,14 @@
{
"namespace": "org.apache.reef.vortex.common.avro",
"type": "record",
+ "name": "AvroTaskletCancelledReport",
+ "fields": [
+ {"name": "taskletId", "type": "int"}
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
"name": "AvroTaskletFailureReport",
"fields": [
{"name": "taskletId", "type": "int"},
@@ -42,15 +50,15 @@
"fields": [
{
"name": "reportType",
- "type": {"type": "enum", "name": "AvroReportType", "symbols": ["TaskletResult", "TaskletFailure"]}
- },
- {
- "name": "taskletResult",
- "type": ["null", "AvroTaskletResultReport"], "default": null
+ "type": {
+ "type": "enum",
+ "name": "AvroReportType",
+ "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]}
},
{
- "name": "taskletFailure",
- "type": ["null", "AvroTaskletFailureReport"], "default": null
+ "name": "taskletReport",
+ "type": ["null", "AvroTaskletResultReport", "AvroTaskletCancelledReport", "AvroTaskletFailureReport"],
+ "default": null
}
]
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index 34cf6b6..019eeb0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -21,28 +21,37 @@ package org.apache.reef.vortex.api;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.driver.VortexMaster;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* The interface between user code and submitted task.
*/
@Unstable
public final class VortexFuture<TOutput> implements Future<TOutput> {
+ private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName());
+
// userResult starts out as null. If not null => variable is set and tasklet returned.
// Otherwise tasklet has not completed.
private Optional<TOutput> userResult = null;
private Exception userException;
+ private AtomicBoolean cancelled = new AtomicBoolean(false);
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final FutureCallback<TOutput> callbackHandler;
private final Executor executor;
+ private final VortexMaster vortexMaster;
+ private final int taskletId;
/**
* Creates a {@link VortexFuture}.
*/
@Private
- public VortexFuture(final Executor executor) {
- this(executor, null);
+ public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId) {
+ this(executor, vortexMaster, taskletId, null);
}
/**
@@ -50,25 +59,71 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
*/
@Private
public VortexFuture(final Executor executor,
+ final VortexMaster vortexMaster,
+ final int taskletId,
final FutureCallback<TOutput> callbackHandler) {
this.executor = executor;
+ this.vortexMaster = vortexMaster;
+ this.taskletId = taskletId;
this.callbackHandler = callbackHandler;
}
/**
- * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user.
+ * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed.
+ * @return true if task did not start or was cancelled, false if task failed or completed
*/
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
- throw new UnsupportedOperationException("Cancel not yet supported");
+ try {
+ return cancel(mayInterruptIfRunning, Optional.<Long>empty(), Optional.<TimeUnit>empty());
+ } catch (final TimeoutException e) {
+ // This should never happen.
+ LOG.log(Level.WARNING, "Received a TimeoutException in VortexFuture.cancel(). Should not have occurred.");
+ return false;
+ }
}
/**
- * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user.
+ * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed, or
+ * if the timeout has expired.
+ * @return true if task did not start or was cancelled, false if task failed or completed
+ */
+ public boolean cancel(final boolean mayInterruptIfRunning, final long timeout, final TimeUnit unit)
+ throws TimeoutException {
+ return cancel(mayInterruptIfRunning, Optional.of(timeout), Optional.of(unit));
+ }
+
+ private boolean cancel(final boolean mayInterruptIfRunning,
+ final Optional<Long> timeout,
+ final Optional<TimeUnit> unit) throws TimeoutException {
+ if (isDone()) {
+ return isCancelled();
+ }
+
+ vortexMaster.cancelTasklet(mayInterruptIfRunning, taskletId);
+
+ try {
+ if (timeout.isPresent() && unit.isPresent()) {
+ if (!countDownLatch.await(timeout.get(), unit.get())) {
+ throw new TimeoutException();
+ }
+ } else {
+ countDownLatch.await();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return false;
+ }
+
+ return isCancelled();
+ }
+
+ /**
+ * @return true if the task is cancelled, false if not.
*/
@Override
public boolean isCancelled() {
- throw new UnsupportedOperationException("Cancel not yet supported");
+ return cancelled.get();
}
/**
@@ -88,8 +143,12 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
if (userResult != null) {
return userResult.get();
} else {
- assert userException != null;
- throw new ExecutionException(userException);
+ assert this.cancelled.get() || userException != null;
+ if (userException != null) {
+ throw new ExecutionException(userException);
+ }
+
+ throw new ExecutionException(new InterruptedException("Task was cancelled."));
}
}
@@ -106,8 +165,12 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
if (userResult != null) {
return userResult.get();
} else {
- assert userException != null;
- throw new ExecutionException(userException);
+ assert this.cancelled.get() || userException != null;
+ if (userException != null) {
+ throw new ExecutionException(userException);
+ }
+
+ throw new ExecutionException(new InterruptedException("Task was cancelled."));
}
}
@@ -130,6 +193,7 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
/**
* Called by VortexMaster to let the user know that the task threw an exception.
*/
+ @Private
public void threwException(final Exception exception) {
this.userException = exception;
if (callbackHandler != null) {
@@ -142,4 +206,21 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
}
this.countDownLatch.countDown();
}
+
+ /**
+ * Called by VortexMaster to let the user know that the task was cancelled.
+ */
+ @Private
+ public void cancelled() {
+ this.cancelled.set(true);
+ if (callbackHandler != null) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ callbackHandler.onFailure(new InterruptedException("VortexFuture has been cancelled on request."));
+ }
+ });
+ }
+ this.countDownLatch.countDown();
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
new file mode 100644
index 0000000..8f725a8
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * A {@link VortexRequest} to cancel tasklets.
+ */
+@Unstable
+public final class TaskletCancellationRequest implements VortexRequest {
+ private final int taskletId;
+
+ public TaskletCancellationRequest(final int taskletId) {
+ this.taskletId = taskletId;
+ }
+
+ @Override
+ public int getTaskletId() {
+ return taskletId;
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.CancelTasklet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
new file mode 100644
index 0000000..b51219b
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * The report of a cancelled Tasklet.
+ */
+@Unstable
+public final class TaskletCancelledReport implements WorkerReport {
+ private int taskletId;
+
+ /**
+ * @param taskletId of the cancelled tasklet.
+ */
+ public TaskletCancelledReport(final int taskletId) {
+ this.taskletId = taskletId;
+ }
+
+ @Override
+ public WorkerReportType getType() {
+ return WorkerReportType.TaskletCancelled;
+ }
+
+ @Override
+ public int getTaskletId() {
+ return taskletId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
index 961b574..8e43e4b 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
@@ -62,6 +62,7 @@ public final class TaskletExecutionRequest<TInput extends Serializable, TOutput
/**
* Get id of the tasklet.
*/
+ @Override
public int getTaskletId() {
return taskletId;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
index dc847e7..96df2e1 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
@@ -48,6 +48,7 @@ public final class TaskletFailureReport implements WorkerReport {
/**
* @return the id of the tasklet.
*/
+ @Override
public int getTaskletId() {
return taskletId;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
index 0a6fab3..cd3a597 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -50,6 +50,7 @@ public final class TaskletResultReport<TOutput extends Serializable> implements
/**
* @return the id of the tasklet.
*/
+ @Override
public int getTaskletId() {
return taskletId;
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index c3d01de..4ab4cb5 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -54,7 +54,7 @@ public final class VortexAvroUtils {
final byte[] serializedFunction = SerializationUtils.serialize(taskletExecutionRequest.getFunction());
avroVortexRequest = AvroVortexRequest.newBuilder()
.setRequestType(AvroRequestType.ExecuteTasklet)
- .setTaskletExecutionRequest(
+ .setTaskletRequest(
AvroTaskletExecutionRequest.newBuilder()
.setTaskletId(taskletExecutionRequest.getTaskletId())
.setSerializedInput(ByteBuffer.wrap(serializedInput))
@@ -62,6 +62,16 @@ public final class VortexAvroUtils {
.build())
.build();
break;
+ case CancelTasklet:
+ final TaskletCancellationRequest taskletCancellationRequest = (TaskletCancellationRequest) vortexRequest;
+ avroVortexRequest = AvroVortexRequest.newBuilder()
+ .setRequestType(AvroRequestType.CancelTasklet)
+ .setTaskletRequest(
+ AvroTaskletCancellationRequest.newBuilder()
+ .setTaskletId(taskletCancellationRequest.getTaskletId())
+ .build())
+ .build();
+ break;
default:
throw new RuntimeException("Undefined message type");
}
@@ -85,19 +95,29 @@ public final class VortexAvroUtils {
final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult());
avroWorkerReport = AvroWorkerReport.newBuilder()
.setReportType(AvroReportType.TaskletResult)
- .setTaskletResult(
+ .setTaskletReport(
AvroTaskletResultReport.newBuilder()
.setTaskletId(taskletResultReport.getTaskletId())
.setSerializedOutput(ByteBuffer.wrap(serializedOutput))
.build())
.build();
break;
+ case TaskletCancelled:
+ final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) workerReport;
+ avroWorkerReport = AvroWorkerReport.newBuilder()
+ .setReportType(AvroReportType.TaskletCancelled)
+ .setTaskletReport(
+ AvroTaskletCancelledReport.newBuilder()
+ .setTaskletId(workerReport.getTaskletId())
+ .build())
+ .build();
+ break;
case TaskletFailure:
final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException());
avroWorkerReport = AvroWorkerReport.newBuilder()
.setReportType(AvroReportType.TaskletFailure)
- .setTaskletFailure(
+ .setTaskletReport(
AvroTaskletFailureReport.newBuilder()
.setTaskletId(taskletFailureReport.getTaskletId())
.setSerializedException(ByteBuffer.wrap(serializedException))
@@ -123,7 +143,8 @@ public final class VortexAvroUtils {
final VortexRequest vortexRequest;
switch (avroVortexRequest.getRequestType()) {
case ExecuteTasklet:
- final AvroTaskletExecutionRequest taskletExecutionRequest = avroVortexRequest.getTaskletExecutionRequest();
+ final AvroTaskletExecutionRequest taskletExecutionRequest =
+ (AvroTaskletExecutionRequest)avroVortexRequest.getTaskletRequest();
// TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction
final VortexFunction function =
(VortexFunction) SerializationUtils.deserialize(
@@ -134,6 +155,11 @@ public final class VortexAvroUtils {
taskletExecutionRequest.getSerializedInput().array());
vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, input);
break;
+ case CancelTasklet:
+ final AvroTaskletCancellationRequest taskletCancellationRequest =
+ (AvroTaskletCancellationRequest)avroVortexRequest.getTaskletRequest();
+ vortexRequest = new TaskletCancellationRequest(taskletCancellationRequest.getTaskletId());
+ break;
default:
throw new RuntimeException("Undefined VortexRequest type");
}
@@ -150,14 +176,21 @@ public final class VortexAvroUtils {
final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class);
switch (avroWorkerReport.getReportType()) {
case TaskletResult:
- final AvroTaskletResultReport taskletResultReport = avroWorkerReport.getTaskletResult();
+ final AvroTaskletResultReport taskletResultReport =
+ (AvroTaskletResultReport)avroWorkerReport.getTaskletReport();
// TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
final Serializable output =
(Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
break;
+ case TaskletCancelled:
+ final AvroTaskletCancelledReport taskletCancelledReport =
+ (AvroTaskletCancelledReport)avroWorkerReport.getTaskletReport();
+ workerReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId());
+ break;
case TaskletFailure:
- final AvroTaskletFailureReport taskletFailureReport = avroWorkerReport.getTaskletFailure();
+ final AvroTaskletFailureReport taskletFailureReport =
+ (AvroTaskletFailureReport)avroWorkerReport.getTaskletReport();
final Exception exception =
(Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
index 51061c2..5d59a96 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
@@ -31,10 +31,16 @@ public interface VortexRequest extends Serializable {
* Type of Request.
*/
enum RequestType {
- ExecuteTasklet
+ ExecuteTasklet,
+ CancelTasklet
}
/**
+ * @return the ID of the VortexTasklet associated with this VortexRequest.
+ */
+ int getTaskletId();
+
+ /**
* @return the type of this VortexRequest.
*/
RequestType getType();
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
index 7e320cb..192299a 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
@@ -32,6 +32,7 @@ public interface WorkerReport extends Serializable {
*/
enum WorkerReportType {
TaskletResult,
+ TaskletCancelled,
TaskletFailure
}
@@ -39,4 +40,9 @@ public interface WorkerReport extends Serializable {
* @return the type of this WorkerReport.
*/
WorkerReportType getType();
+
+ /**
+ * @return the taskletId of this WorkerReport.
+ */
+ int getTaskletId();
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index a4b5ef0..cb62049 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -66,17 +66,27 @@ final class DefaultVortexMaster implements VortexMaster {
final Optional<FutureCallback<TOutput>> callback) {
// TODO[REEF-500]: Simple duplicate Vortex Tasklet launch.
final VortexFuture<TOutput> vortexFuture;
+ final int id = taskletIdCounter.getAndIncrement();
if (callback.isPresent()) {
- vortexFuture = new VortexFuture<>(executor, callback.get());
+ vortexFuture = new VortexFuture<>(executor, this, id, callback.get());
} else {
- vortexFuture = new VortexFuture<>(executor);
+ vortexFuture = new VortexFuture<>(executor, this, id);
}
- this.pendingTasklets.addLast(new Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture));
+ final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture);
+ this.pendingTasklets.addLast(tasklet);
return vortexFuture;
}
/**
+ * Cancels tasklets on the running workers.
+ */
+ @Override
+ public void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId) {
+ this.runningWorkers.cancelTasklet(mayInterruptIfRunning, taskletId);
+ }
+
+ /**
* Add a new worker to runningWorkers.
*/
@Override
@@ -116,6 +126,14 @@ final class DefaultVortexMaster implements VortexMaster {
}
/**
+ * Notify tasklet cancellation to runningWorkers.
+ */
+ @Override
+ public void taskletCancelled(final String workerId, final int taskletId) {
+ runningWorkers.taskletCancelled(workerId, taskletId);
+ }
+
+ /**
* Terminate the job.
*/
@Override
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
index 6dfd8bb..8b28eab 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
@@ -143,6 +143,12 @@ class FirstFitSchedulingPolicy implements SchedulingPolicy {
removeTasklet(workerId);
}
+ @Override
+ public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+ final String workerId = vortexWorker.getId();
+ removeTasklet(workerId);
+ }
+
private void removeTasklet(final String workerId) {
if (idLoadMap.containsKey(workerId)) {
idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - 1));
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
index eac4fc6..9b3e318 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
@@ -48,4 +48,4 @@ final class PendingTasklets {
Tasklet takeFirst() throws InterruptedException {
return pendingTasklets.takeFirst();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
index 2f71cd7..ffa2fbf 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
@@ -102,4 +102,12 @@ class RandomSchedulingPolicy implements SchedulingPolicy {
// Do nothing
}
+ /**
+ * Do nothing.
+ */
+ @Override
+ public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+ // Do nothing
+ }
+
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index eebac5f..7b05c0c 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -30,6 +30,8 @@ import java.util.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* Keeps track of all running VortexWorkers and Tasklets.
@@ -38,8 +40,12 @@ import java.util.concurrent.locks.ReentrantLock;
@ThreadSafe
@DriverSide
final class RunningWorkers {
+ private static final Logger LOG = Logger.getLogger(RunningWorkers.class.getName());
+
// RunningWorkers and its locks
private final HashMap<String, VortexWorkerManager> runningWorkers = new HashMap<>(); // Running workers/tasklets
+ private final Set<Integer> taskletsToCancel = new HashSet<>();
+
private final Lock lock = new ReentrantLock();
private final Condition noWorkerOrResource = lock.newCondition();
@@ -132,6 +138,14 @@ final class RunningWorkers {
}
}
+ // TODO[JIRA REEF-500]: Will need to support duplicate tasklets.
+ if (taskletsToCancel.contains(tasklet.getId())) {
+ tasklet.cancelled();
+ taskletsToCancel.remove(tasklet.getId());
+ LOG.log(Level.FINE, "Cancelled tasklet {0}.", tasklet.getId());
+ return;
+ }
+
final VortexWorkerManager vortexWorkerManager = runningWorkers.get(workerId.get());
vortexWorkerManager.launchTasklet(tasklet);
schedulingPolicy.taskletLaunched(vortexWorkerManager, tasklet);
@@ -143,6 +157,30 @@ final class RunningWorkers {
/**
* Concurrency: Called by multiple threads.
+ * Parameter: Same taskletId can come in multiple times.
+ */
+ void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId) {
+ lock.lock();
+ try {
+ // This is not ideal since we are using a linear time search on all the workers.
+ final String workerId = getWhereTaskletWasScheduledTo(taskletId);
+ if (workerId == null) {
+ // launchTasklet called but not yet running.
+ taskletsToCancel.add(taskletId);
+ return;
+ }
+
+ if (mayInterruptIfRunning) {
+ LOG.log(Level.FINE, "Cancelling running Tasklet with ID {0}.", taskletId);
+ runningWorkers.get(workerId).cancelTasklet(taskletId);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Concurrency: Called by multiple threads.
* Parameter: Same arguments can come in multiple times.
* (e.g. preemption message coming before tasklet completion message multiple times)
*/
@@ -158,6 +196,8 @@ final class RunningWorkers {
// Notify (possibly) waiting scheduler
noWorkerOrResource.signal();
+
+ taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
}
} finally {
lock.unlock();
@@ -181,6 +221,32 @@ final class RunningWorkers {
// Notify (possibly) waiting scheduler
noWorkerOrResource.signal();
+
+ taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Concurrency: Called by multiple threads.
+ * Parameter: Same arguments can come in multiple times.
+ * (e.g. preemption message coming before tasklet error message multiple times)
+ */
+ void taskletCancelled(final String workerId,
+ final int taskletId) {
+ lock.lock();
+ try {
+ if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before
+ final VortexWorkerManager worker = this.runningWorkers.get(workerId);
+ final Tasklet tasklet = worker.taskletCancelled(taskletId);
+ this.schedulingPolicy.taskletCancelled(worker, tasklet);
+
+ // Notify (possibly) waiting scheduler
+ noWorkerOrResource.signal();
+
+ taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
}
} finally {
lock.unlock();
@@ -209,17 +275,8 @@ final class RunningWorkers {
return terminated;
}
- ///////////////////////////////////////// For Tests Only
-
- /**
- * For unit tests to check whether the worker is running.
- */
- boolean isWorkerRunning(final String workerId) {
- return runningWorkers.containsKey(workerId);
- }
-
/**
- * For unit tests to see where a tasklet is scheduled to.
+ * Find where a tasklet is scheduled to.
* @param taskletId id of the tasklet in question
* @return id of the worker (null if the tasklet was not scheduled to any worker)
*/
@@ -233,4 +290,13 @@ final class RunningWorkers {
}
return null;
}
+
+ ///////////////////////////////////////// For Tests Only
+
+ /**
+ * For unit tests to check whether the worker is running.
+ */
+ boolean isWorkerRunning(final String workerId) {
+ return runningWorkers.containsKey(workerId);
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
index cef1cb6..a058f6f 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
@@ -57,4 +57,9 @@ interface SchedulingPolicy {
* Tasklet failed.
*/
void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
+
+ /**
+ * Tasklet cancelled.
+ */
+ void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
index 6cd6c44..b0138d0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
@@ -80,6 +80,13 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> {
}
/**
+ * Called by VortexMaster to let the user know that the task has been cancelled.
+ */
+ void cancelled(){
+ vortexFuture.cancelled();
+ }
+
+ /**
* For tests.
*/
boolean isCompleted() {
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index 352cdbd..75674cc 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -28,6 +28,7 @@ import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.common.TaskletCancelledReport;
import org.apache.reef.vortex.common.TaskletFailureReport;
import org.apache.reef.vortex.common.TaskletResultReport;
import org.apache.reef.vortex.common.VortexAvroUtils;
@@ -160,6 +161,10 @@ final class VortexDriver {
final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult());
break;
+ case TaskletCancelled:
+ final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport)workerReport;
+ vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId());
+ break;
case TaskletFailure:
final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(),
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index 5c7f684..38bbcc5 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -44,6 +44,14 @@ public interface VortexMaster {
final Optional<FutureCallback<TOutput>> callback);
/**
+ * Call this when a Tasklet is to be cancelled.
+ * @param mayInterruptIfRunning if true, will attempt to cancel running Tasklets; otherwise will only
+ * prevent a pending Tasklet from running.
+ * @param taskletId the ID of the Tasklet.
+ */
+ void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId);
+
+ /**
* Call this when a new worker is up and running.
*/
void workerAllocated(final VortexWorkerManager vortexWorkerManager);
@@ -64,6 +72,11 @@ public interface VortexMaster {
void taskletErrored(final String workerId, final int taskletId, final Exception exception);
/**
+ * Call this when a Tasklet is cancelled and the cancellation is honored.
+ */
+ void taskletCancelled(final String workerId, final int taskletId);
+
+ /**
* Release all resources and shut down.
*/
void terminate();
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index f019668..76d6cec 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -21,6 +21,7 @@ package org.apache.reef.vortex.driver;
import net.jcip.annotations.NotThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.vortex.common.TaskletCancellationRequest;
import org.apache.reef.vortex.common.TaskletExecutionRequest;
import java.io.Serializable;
@@ -51,6 +52,11 @@ class VortexWorkerManager {
vortexRequestor.send(reefTask, taskletExecutionRequest);
}
+ void cancelTasklet(final int taskletId) {
+ final TaskletCancellationRequest cancellationRequest = new TaskletCancellationRequest(taskletId);
+ vortexRequestor.send(reefTask, cancellationRequest);
+ }
+
<TOutput extends Serializable> Tasklet taskletCompleted(final Integer taskletId, final TOutput result) {
final Tasklet<?, TOutput> tasklet = runningTasklets.remove(taskletId);
assert tasklet != null; // Tasklet should complete/error only once
@@ -65,6 +71,13 @@ class VortexWorkerManager {
return tasklet;
}
+ Tasklet taskletCancelled(final Integer taskletId) {
+ final Tasklet tasklet = runningTasklets.remove(taskletId);
+ assert tasklet != null; // Tasklet should finish only once.
+ tasklet.cancelled();
+ return tasklet;
+ }
+
Collection<Tasklet> removed() {
return runningTasklets.isEmpty() ? null : runningTasklets.values();
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index e65830c..10d23a8 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -36,6 +36,8 @@ import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.io.Serializable;
import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* Receives commands from VortexMaster, executes them, and returns the results.
@@ -45,6 +47,7 @@ import java.util.concurrent.*;
@Unit
@TaskSide
public final class VortexWorker implements Task, TaskMessageSource {
+ private static final Logger LOG = Logger.getLogger(VortexWorker.class.getName());
private static final String MESSAGE_SOURCE_ID = ""; // empty string as there is no use for it
private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>();
@@ -68,6 +71,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
public byte[] call(final byte[] memento) throws Exception {
final ExecutorService schedulerThread = Executors.newSingleThreadExecutor();
final ExecutorService commandExecutor = Executors.newFixedThreadPool(numOfThreads);
+ final ConcurrentMap<Integer, Future> futures = new ConcurrentHashMap<>();
// Scheduling thread starts
schedulerThread.execute(new Runnable() {
@@ -82,37 +86,66 @@ public final class VortexWorker implements Task, TaskMessageSource {
throw new RuntimeException(e);
}
- // Scheduler Thread: Pass the command to the worker thread pool to be executed
- commandExecutor.execute(new Runnable() {
- @Override
- public void run() {
- // Command Executor: Deserialize the command
- final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message);
- switch (vortexRequest.getType()) {
- case ExecuteTasklet:
- final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
- try {
- // Command Executor: Execute the command
- final Serializable result = taskletExecutionRequest.execute();
-
- // Command Executor: Tasklet successfully returns result
- final WorkerReport report =
- new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
- workerReports.addLast(VortexAvroUtils.toBytes(report));
- } catch (Exception e) {
- // Command Executor: Tasklet throws an exception
- final WorkerReport report =
- new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
- workerReports.addLast(VortexAvroUtils.toBytes(report));
- }
-
- heartBeatTriggerManager.triggerHeartBeat();
- break;
- default:
- throw new RuntimeException("Unknown Command");
+ // Command Executor: Deserialize the command
+ final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message);
+
+ switch (vortexRequest.getType()) {
+ case ExecuteTasklet:
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // Scheduler Thread: Pass the command to the worker thread pool to be executed
+ // Record future to support cancellation.
+ futures.put(
+ vortexRequest.getTaskletId(),
+ commandExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+ try {
+ // Command Executor: Execute the command
+ final Serializable result = taskletExecutionRequest.execute();
+ final WorkerReport report =
+ new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
+ workerReports.addLast(VortexAvroUtils.toBytes(report));
+ } catch (final InterruptedException ex) {
+ // Assumes that user's thread follows convention that cancelled Futures
+ // should throw InterruptedException.
+ final WorkerReport report = new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
+ LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", vortexRequest.getTaskletId());
+ workerReports.addLast(VortexAvroUtils.toBytes(report));
+ } catch (Exception e) {
+ // Command Executor: Tasklet throws an exception
+ final WorkerReport report =
+ new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
+ workerReports.addLast(VortexAvroUtils.toBytes(report));
+ }
+
+ try {
+ latch.await();
+ } catch (final InterruptedException e) {
+ LOG.log(Level.SEVERE, "Cannot wait for Future to be put.");
+ throw new RuntimeException(e);
+ }
+
+ futures.remove(vortexRequest.getTaskletId());
+ heartBeatTriggerManager.triggerHeartBeat();
+ }
+ }));
+
+ // Signal that future is put.
+ latch.countDown();
+ break;
+ case CancelTasklet:
+ LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", vortexRequest.getTaskletId());
+ final Future future = futures.get(vortexRequest.getTaskletId());
+ if (future != null) {
+ future.cancel(true);
}
- }
- });
+
+ break;
+ default:
+ throw new RuntimeException("Unknown Command");
+ }
}
}
});
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index 37298fa..6c2162a 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -26,6 +26,8 @@ import org.junit.Test;
import java.util.*;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.*;
@@ -204,6 +206,56 @@ public class DefaultVortexMasterTest {
}
/**
+ * Test handling of single tasklet execution with a cancellation after launch.
+ */
+ @Test(timeout = 10000)
+ public void testSingleTaskletCancellation() throws Exception {
+ final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy());
+ final PendingTasklets pendingTasklets = new PendingTasklets();
+ final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets);
+ launchTasklets(runningWorkers, pendingTasklets, 1);
+
+ assertTrue(future.cancel(true));
+ assertTrue("The VortexFuture should be cancelled.", future.isCancelled());
+ assertTrue("The VortexFuture should be done", future.isDone());
+ }
+
+ /**
+ * Test handling of single tasklet execution with a cancellation before launch.
+ */
+ @Test(timeout = 10000)
+ public void testSingleTaskletCancellationBeforeLaunch() throws Exception {
+
+ final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy());
+ final PendingTasklets pendingTasklets = new PendingTasklets();
+ final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets);
+
+ try {
+ future.cancel(true, 100, TimeUnit.MILLISECONDS);
+ fail();
+ } catch (final TimeoutException e) {
+ // TimeoutException is expected.
+ }
+
+ launchTasklets(runningWorkers, pendingTasklets, 1);
+ assertTrue(future.cancel(true));
+ assertTrue("The VortexFuture should be cancelled.", future.isCancelled());
+ assertTrue("The VortexFuture should be done", future.isDone());
+ }
+
+ private VortexFuture createTaskletCancellationFuture(final RunningWorkers runningWorkers,
+ final PendingTasklets pendingTasklets) {
+ final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction();
+ final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
+
+ final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+
+ // Allocate worker & tasklet and schedule
+ vortexMaster.workerAllocated(vortexWorkerManager1);
+ return vortexMaster.enqueueTasklet(vortexFunction, null, Optional.<FutureCallback<Integer>>empty());
+ }
+
+ /**
* Launch specified number of tasklets as a substitute for PendingTaskletLauncher.
* @return ids of launched tasklets
*/
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index fc333e3..8704b0b 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -21,12 +21,18 @@ package org.apache.reef.vortex.driver;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.TaskletCancellationRequest;
+import org.apache.reef.vortex.common.VortexRequest;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.Serializable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,6 +43,7 @@ public final class TestUtil {
private final AtomicInteger taskletId = new AtomicInteger(0);
private final AtomicInteger workerId = new AtomicInteger(0);
private final Executor executor = Executors.newFixedThreadPool(5);
+ private final VortexMaster vortexMaster = mock(VortexMaster.class);
/**
* @return a new mocked worker.
@@ -45,14 +52,28 @@ public final class TestUtil {
final RunningTask reefTask = mock(RunningTask.class);
when(reefTask.getId()).thenReturn("worker" + String.valueOf(workerId.getAndIncrement()));
final VortexRequestor vortexRequestor = mock(VortexRequestor.class);
- return new VortexWorkerManager(vortexRequestor, reefTask);
+ final VortexWorkerManager workerManager = new VortexWorkerManager(vortexRequestor, reefTask);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final VortexRequest request = (VortexRequest)invocation.getArguments()[1];
+ if (request instanceof TaskletCancellationRequest) {
+ workerManager.taskletCancelled(request.getTaskletId());
+ }
+
+ return null;
+ }
+ }).when(vortexRequestor).send(any(RunningTask.class), any(VortexRequest.class));
+
+ return workerManager;
}
/**
* @return a new dummy tasklet.
*/
public Tasklet newTasklet() {
- return new Tasklet(taskletId.getAndIncrement(), null, null, new VortexFuture(executor));
+ final int id = taskletId.getAndIncrement();
+ return new Tasklet(id, null, null, new VortexFuture(executor, vortexMaster, id));
}
/**
@@ -68,6 +89,23 @@ public final class TestUtil {
}
/**
+ * @return a new dummy function.
+ */
+ public VortexFunction newInfiniteLoopFunction() {
+ return new VortexFunction() {
+ @Override
+ public Serializable call(final Serializable serializable) throws Exception {
+ while(true) {
+ Thread.sleep(100);
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+ }
+ }
+ };
+ }
+
+ /**
* @return a dummy integer-integer function.
*/
public VortexFunction<Integer, Integer> newIntegerFunction() {
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
index f9068cd..fca17f9 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
@@ -20,13 +20,15 @@ package org.apache.reef.tests.applications.vortex;
import org.apache.reef.tests.applications.vortex.addone.AddOneTest;
import org.apache.reef.tests.applications.vortex.exception.VortexExceptionTest;
+import org.apache.reef.tests.applications.vortex.cancellation.TaskletCancellationTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses({
AddOneTest.class,
- VortexExceptionTest.class
+ VortexExceptionTest.class,
+ TaskletCancellationTest.class
})
public final class VortexTestSuite {
}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
new file mode 100644
index 0000000..f1c982e
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.applications.vortex.cancellation;
+
+import org.apache.reef.vortex.api.VortexFunction;
+
+import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Runs an infinite loop and waits for cancellation.
+ */
+public final class InfiniteLoopWithCancellationFunction implements VortexFunction {
+ private static final Logger LOG = Logger.getLogger(InfiniteLoopWithCancellationFunction.class.getName());
+
+ @Override
+ public Serializable call(final Serializable serializable) throws Exception {
+ LOG.log(Level.FINE, "Entered Infinite Loop Tasklet.");
+ while (true) {
+ Thread.sleep(100);
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Expected exception!");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
new file mode 100644
index 0000000..d8ea09c
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.applications.vortex.cancellation;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.vortex.driver.VortexConfHelper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests the cancellation of a tasklet.
+ */
+public final class TaskletCancellationTest {
+ private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+ /**
+ * Set up the test environment.
+ */
+ @Before
+ public void setUp() throws Exception {
+ this.testEnvironment.setUp();
+ }
+
+ /**
+ * Tear down the test environment.
+ */
+ @After
+ public void tearDown() throws Exception {
+ this.testEnvironment.tearDown();
+ }
+
+ @Test
+ public void testVortexTaskletCancellation() {
+ final Configuration conf =
+ VortexConfHelper.getVortexConf(
+ "TEST_Vortex_TaskletCancellationTest", TaskletCancellationTestStart.class, 2, 64, 4, 2000);
+ final LauncherStatus status = this.testEnvironment.run(conf);
+ Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
new file mode 100644
index 0000000..d60e0b5
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.applications.vortex.cancellation;
+
+import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.api.VortexThreadPool;
+import org.junit.Assert;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests the cancellation of a tasklet.
+ */
+public final class TaskletCancellationTestStart implements VortexStart {
+
+ @Inject
+ private TaskletCancellationTestStart() {
+ }
+
+ @Override
+ public void start(final VortexThreadPool vortexThreadPool) {
+ final InfiniteLoopWithCancellationFunction function = new InfiniteLoopWithCancellationFunction();
+ final VortexFuture future = vortexThreadPool.submit(function, 0);
+
+ try {
+ // Hacky way to increase probability that the task has been launched.
+ // TODO[JIRA REEF-1051]: Query the VortexMaster for the Tasklet status.
+ future.get(10, TimeUnit.SECONDS);
+ } catch (final TimeoutException e) {
+ // Harmless.
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Unexpected exception.");
+ }
+
+ Assert.assertTrue(future.cancel(true));
+
+ try {
+ future.get();
+ Assert.fail();
+ } catch (final ExecutionException e) {
+ // Expected.
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ Assert.assertTrue(future.isCancelled());
+ Assert.assertTrue(future.isDone());
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java
new file mode 100644
index 0000000..8bd04c7
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Package for testing Vortex Tasklet cancellation.
+ */
+package org.apache.reef.tests.applications.vortex.cancellation;
\ No newline at end of file