You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/12/10 10:47:43 UTC

reef git commit: [REEF-502] Support Vortex Tasklet(s) cancellation by user

Repository: reef
Updated Branches:
  refs/heads/master 766883f0c -> fe6194cac


[REEF-502] Support Vortex Tasklet(s) cancellation by user

JIRA:
  [REEF-502](https://issues.apache.org/jira/browse/REEF-502)

Pull Request:
  Closes #708


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/fe6194ca
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/fe6194ca
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/fe6194ca

Branch: refs/heads/master
Commit: fe6194cacbfa38714a295790fc302d5c86a01726
Parents: 766883f
Author: Andrew Chung <af...@gmail.com>
Authored: Fri Dec 4 17:38:34 2015 -0800
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Thu Dec 10 18:46:11 2015 +0900

----------------------------------------------------------------------
 .../src/main/avro/VortexRequest.avsc            |  18 +++-
 .../reef-vortex/src/main/avro/WorkerReport.avsc |  22 ++--
 .../apache/reef/vortex/api/VortexFuture.java    | 101 +++++++++++++++++--
 .../common/TaskletCancellationRequest.java      |  43 ++++++++
 .../vortex/common/TaskletCancelledReport.java   |  46 +++++++++
 .../vortex/common/TaskletExecutionRequest.java  |   1 +
 .../vortex/common/TaskletFailureReport.java     |   1 +
 .../reef/vortex/common/TaskletResultReport.java |   1 +
 .../reef/vortex/common/VortexAvroUtils.java     |  45 +++++++--
 .../reef/vortex/common/VortexRequest.java       |   8 +-
 .../apache/reef/vortex/common/WorkerReport.java |   6 ++
 .../reef/vortex/driver/DefaultVortexMaster.java |  24 ++++-
 .../vortex/driver/FirstFitSchedulingPolicy.java |   6 ++
 .../reef/vortex/driver/PendingTasklets.java     |   2 +-
 .../vortex/driver/RandomSchedulingPolicy.java   |   8 ++
 .../reef/vortex/driver/RunningWorkers.java      |  86 ++++++++++++++--
 .../reef/vortex/driver/SchedulingPolicy.java    |   5 +
 .../org/apache/reef/vortex/driver/Tasklet.java  |   7 ++
 .../apache/reef/vortex/driver/VortexDriver.java |   5 +
 .../apache/reef/vortex/driver/VortexMaster.java |  13 +++
 .../reef/vortex/driver/VortexWorkerManager.java |  13 +++
 .../reef/vortex/evaluator/VortexWorker.java     |  93 +++++++++++------
 .../vortex/driver/DefaultVortexMasterTest.java  |  52 ++++++++++
 .../org/apache/reef/vortex/driver/TestUtil.java |  42 +++++++-
 .../applications/vortex/VortexTestSuite.java    |   4 +-
 .../InfiniteLoopWithCancellationFunction.java   |  44 ++++++++
 .../cancellation/TaskletCancellationTest.java   |  62 ++++++++++++
 .../TaskletCancellationTestStart.java           |  72 +++++++++++++
 .../vortex/cancellation/package-info.java       |  23 +++++
 29 files changed, 780 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
index 2453761..bf4396e 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
@@ -30,10 +30,24 @@
   {
     "namespace": "org.apache.reef.vortex.common.avro",
     "type": "record",
+    "name": "AvroTaskletCancellationRequest",
+    "fields": [{"name": "taskletId", "type": "int"}]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
     "name": "AvroVortexRequest",
     "fields": [
-      {"name": "requestType", "type": {"type": "enum", "name": "AvroRequestType", "symbols": ["ExecuteTasklet"]}},
-      {"name": "taskletExecutionRequest", "type": ["null", "AvroTaskletExecutionRequest"], "default": null}
+      {
+        "name": "requestType",
+        "type": {"type": "enum", "name": "AvroRequestType",
+        "symbols": ["ExecuteTasklet", "CancelTasklet"]}
+      },
+      {
+        "name": "taskletRequest",
+        "type": ["null", "AvroTaskletExecutionRequest", "AvroTaskletCancellationRequest"],
+        "default": null
+      }
     ]
   }
 ]

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
index 19a655d..11adb12 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -29,6 +29,14 @@
   {
     "namespace": "org.apache.reef.vortex.common.avro",
     "type": "record",
+    "name": "AvroTaskletCancelledReport",
+    "fields": [
+      {"name": "taskletId", "type": "int"}
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
     "name": "AvroTaskletFailureReport",
     "fields": [
       {"name": "taskletId", "type": "int"},
@@ -42,15 +50,15 @@
     "fields": [
       {
         "name": "reportType",
-        "type": {"type": "enum", "name": "AvroReportType", "symbols": ["TaskletResult", "TaskletFailure"]}
-      },
-      {
-        "name": "taskletResult",
-        "type": ["null", "AvroTaskletResultReport"], "default": null
+        "type": {
+          "type": "enum",
+          "name": "AvroReportType",
+          "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]}
       },
       {
-        "name": "taskletFailure",
-        "type": ["null", "AvroTaskletFailureReport"], "default": null
+        "name": "taskletReport",
+        "type": ["null", "AvroTaskletResultReport", "AvroTaskletCancelledReport", "AvroTaskletFailureReport"],
+        "default": null
       }
     ]
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index 34cf6b6..019eeb0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -21,28 +21,37 @@ package org.apache.reef.vortex.api;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.driver.VortexMaster;
 
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * The interface between user code and submitted task.
  */
 @Unstable
 public final class VortexFuture<TOutput> implements Future<TOutput> {
+  private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName());
+
   // userResult starts out as null. If not null => variable is set and tasklet returned.
   // Otherwise tasklet has not completed.
   private Optional<TOutput> userResult = null;
   private Exception userException;
+  private AtomicBoolean cancelled = new AtomicBoolean(false);
   private final CountDownLatch countDownLatch = new CountDownLatch(1);
   private final FutureCallback<TOutput> callbackHandler;
   private final Executor executor;
+  private final VortexMaster vortexMaster;
+  private final int taskletId;
 
   /**
    * Creates a {@link VortexFuture}.
    */
   @Private
-  public VortexFuture(final Executor executor) {
-    this(executor, null);
+  public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId) {
+    this(executor, vortexMaster, taskletId, null);
   }
 
   /**
@@ -50,25 +59,71 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
    */
   @Private
   public VortexFuture(final Executor executor,
+                      final VortexMaster vortexMaster,
+                      final int taskletId,
                       final FutureCallback<TOutput> callbackHandler) {
     this.executor = executor;
+    this.vortexMaster = vortexMaster;
+    this.taskletId = taskletId;
     this.callbackHandler = callbackHandler;
   }
 
   /**
-   * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user.
+   * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed.
+   * @return true if task did not start or was cancelled, false if task failed or completed
    */
   @Override
   public boolean cancel(final boolean mayInterruptIfRunning) {
-    throw new UnsupportedOperationException("Cancel not yet supported");
+    try {
+      return cancel(mayInterruptIfRunning, Optional.<Long>empty(), Optional.<TimeUnit>empty());
+    } catch (final TimeoutException e) {
+      // This should never happen.
+      LOG.log(Level.WARNING, "Received a TimeoutException in VortexFuture.cancel(). Should not have occurred.");
+      return false;
+    }
   }
 
   /**
-   * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user.
+   * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed, or
+   * if the timeout has expired.
+   * @return true if task did not start or was cancelled, false if task failed or completed
+   */
+  public boolean cancel(final boolean mayInterruptIfRunning, final long timeout, final TimeUnit unit)
+      throws TimeoutException {
+    return cancel(mayInterruptIfRunning, Optional.of(timeout), Optional.of(unit));
+  }
+
+  private boolean cancel(final boolean mayInterruptIfRunning,
+                         final Optional<Long> timeout,
+                         final Optional<TimeUnit> unit) throws TimeoutException {
+    if (isDone()) {
+      return isCancelled();
+    }
+
+    vortexMaster.cancelTasklet(mayInterruptIfRunning, taskletId);
+
+    try {
+      if (timeout.isPresent() && unit.isPresent()) {
+        if (!countDownLatch.await(timeout.get(), unit.get())) {
+          throw new TimeoutException();
+        }
+      } else {
+        countDownLatch.await();
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      return false;
+    }
+
+    return isCancelled();
+  }
+
+  /**
+   * @return true if the task is cancelled, false if not.
    */
   @Override
   public boolean isCancelled() {
-    throw new UnsupportedOperationException("Cancel not yet supported");
+    return cancelled.get();
   }
 
   /**
@@ -88,8 +143,12 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
     if (userResult != null) {
       return userResult.get();
     } else {
-      assert userException != null;
-      throw new ExecutionException(userException);
+      assert this.cancelled.get() || userException != null;
+      if (userException != null) {
+        throw new ExecutionException(userException);
+      }
+
+      throw new ExecutionException(new InterruptedException("Task was cancelled."));
     }
   }
 
@@ -106,8 +165,12 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
     if (userResult != null) {
       return userResult.get();
     } else {
-      assert userException != null;
-      throw new ExecutionException(userException);
+      assert this.cancelled.get() || userException != null;
+      if (userException != null) {
+        throw new ExecutionException(userException);
+      }
+
+      throw new ExecutionException(new InterruptedException("Task was cancelled."));
     }
   }
 
@@ -130,6 +193,7 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
   /**
    * Called by VortexMaster to let the user know that the task threw an exception.
    */
+  @Private
   public void threwException(final Exception exception) {
     this.userException = exception;
     if (callbackHandler != null) {
@@ -142,4 +206,21 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
     }
     this.countDownLatch.countDown();
   }
+
+  /**
+   * Called by VortexMaster to let the user know that the task was cancelled.
+   */
+  @Private
+  public void cancelled() {
+    this.cancelled.set(true);
+    if (callbackHandler != null) {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          callbackHandler.onFailure(new InterruptedException("VortexFuture has been cancelled on request."));
+        }
+      });
+    }
+    this.countDownLatch.countDown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
new file mode 100644
index 0000000..8f725a8
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * A {@link VortexRequest} to cancel tasklets.
+ */
+@Unstable
+public final class TaskletCancellationRequest implements VortexRequest {
+  private final int taskletId;
+
+  public TaskletCancellationRequest(final int taskletId) {
+    this.taskletId = taskletId;
+  }
+
+  @Override
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.CancelTasklet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
new file mode 100644
index 0000000..b51219b
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * The report of a cancelled Tasklet.
+ */
+@Unstable
+public final class TaskletCancelledReport implements WorkerReport {
+  private int taskletId;
+
+  /**
+   * @param taskletId of the cancelled tasklet.
+   */
+  public TaskletCancelledReport(final int taskletId) {
+    this.taskletId = taskletId;
+  }
+
+  @Override
+  public WorkerReportType getType() {
+    return WorkerReportType.TaskletCancelled;
+  }
+
+  @Override
+  public int getTaskletId() {
+    return taskletId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
index 961b574..8e43e4b 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
@@ -62,6 +62,7 @@ public final class TaskletExecutionRequest<TInput extends Serializable, TOutput
   /**
    * Get id of the tasklet.
    */
+  @Override
   public int getTaskletId() {
     return taskletId;
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
index dc847e7..96df2e1 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
@@ -48,6 +48,7 @@ public final class TaskletFailureReport implements WorkerReport {
   /**
    * @return the id of the tasklet.
    */
+  @Override
   public int getTaskletId() {
     return taskletId;
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
index 0a6fab3..cd3a597 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -50,6 +50,7 @@ public final class TaskletResultReport<TOutput extends Serializable> implements
   /**
    * @return the id of the tasklet.
    */
+  @Override
   public int getTaskletId() {
     return taskletId;
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index c3d01de..4ab4cb5 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -54,7 +54,7 @@ public final class VortexAvroUtils {
       final byte[] serializedFunction = SerializationUtils.serialize(taskletExecutionRequest.getFunction());
       avroVortexRequest = AvroVortexRequest.newBuilder()
           .setRequestType(AvroRequestType.ExecuteTasklet)
-          .setTaskletExecutionRequest(
+          .setTaskletRequest(
               AvroTaskletExecutionRequest.newBuilder()
                   .setTaskletId(taskletExecutionRequest.getTaskletId())
                   .setSerializedInput(ByteBuffer.wrap(serializedInput))
@@ -62,6 +62,16 @@ public final class VortexAvroUtils {
                   .build())
           .build();
       break;
+    case CancelTasklet:
+      final TaskletCancellationRequest taskletCancellationRequest = (TaskletCancellationRequest) vortexRequest;
+      avroVortexRequest = AvroVortexRequest.newBuilder()
+          .setRequestType(AvroRequestType.CancelTasklet)
+          .setTaskletRequest(
+              AvroTaskletCancellationRequest.newBuilder()
+                  .setTaskletId(taskletCancellationRequest.getTaskletId())
+                  .build())
+          .build();
+      break;
     default:
       throw new RuntimeException("Undefined message type");
     }
@@ -85,19 +95,29 @@ public final class VortexAvroUtils {
       final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult());
       avroWorkerReport = AvroWorkerReport.newBuilder()
           .setReportType(AvroReportType.TaskletResult)
-          .setTaskletResult(
+          .setTaskletReport(
               AvroTaskletResultReport.newBuilder()
                   .setTaskletId(taskletResultReport.getTaskletId())
                   .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
                   .build())
           .build();
       break;
+    case TaskletCancelled:
+      final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) workerReport;
+      avroWorkerReport = AvroWorkerReport.newBuilder()
+          .setReportType(AvroReportType.TaskletCancelled)
+          .setTaskletReport(
+              AvroTaskletCancelledReport.newBuilder()
+                  .setTaskletId(workerReport.getTaskletId())
+                  .build())
+          .build();
+      break;
     case TaskletFailure:
       final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
       final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException());
       avroWorkerReport = AvroWorkerReport.newBuilder()
           .setReportType(AvroReportType.TaskletFailure)
-          .setTaskletFailure(
+          .setTaskletReport(
               AvroTaskletFailureReport.newBuilder()
                   .setTaskletId(taskletFailureReport.getTaskletId())
                   .setSerializedException(ByteBuffer.wrap(serializedException))
@@ -123,7 +143,8 @@ public final class VortexAvroUtils {
     final VortexRequest vortexRequest;
     switch (avroVortexRequest.getRequestType()) {
     case ExecuteTasklet:
-      final AvroTaskletExecutionRequest taskletExecutionRequest = avroVortexRequest.getTaskletExecutionRequest();
+      final AvroTaskletExecutionRequest taskletExecutionRequest =
+          (AvroTaskletExecutionRequest)avroVortexRequest.getTaskletRequest();
       // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction
       final VortexFunction function =
           (VortexFunction) SerializationUtils.deserialize(
@@ -134,6 +155,11 @@ public final class VortexAvroUtils {
               taskletExecutionRequest.getSerializedInput().array());
       vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, input);
       break;
+    case CancelTasklet:
+      final AvroTaskletCancellationRequest taskletCancellationRequest =
+          (AvroTaskletCancellationRequest)avroVortexRequest.getTaskletRequest();
+      vortexRequest = new TaskletCancellationRequest(taskletCancellationRequest.getTaskletId());
+      break;
     default:
       throw new RuntimeException("Undefined VortexRequest type");
     }
@@ -150,14 +176,21 @@ public final class VortexAvroUtils {
     final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class);
     switch (avroWorkerReport.getReportType()) {
     case TaskletResult:
-      final AvroTaskletResultReport taskletResultReport = avroWorkerReport.getTaskletResult();
+      final AvroTaskletResultReport taskletResultReport =
+          (AvroTaskletResultReport)avroWorkerReport.getTaskletReport();
       // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
       final Serializable output =
           (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
       workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
       break;
+    case TaskletCancelled:
+      final AvroTaskletCancelledReport taskletCancelledReport =
+          (AvroTaskletCancelledReport)avroWorkerReport.getTaskletReport();
+      workerReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId());
+      break;
     case TaskletFailure:
-      final AvroTaskletFailureReport taskletFailureReport = avroWorkerReport.getTaskletFailure();
+      final AvroTaskletFailureReport taskletFailureReport =
+          (AvroTaskletFailureReport)avroWorkerReport.getTaskletReport();
       final Exception exception =
           (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
       workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
index 51061c2..5d59a96 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
@@ -31,10 +31,16 @@ public interface VortexRequest extends Serializable {
    * Type of Request.
    */
   enum RequestType {
-    ExecuteTasklet
+    ExecuteTasklet,
+    CancelTasklet
   }
 
   /**
+   * @return the ID of the VortexTasklet associated with this VortexRequest.
+   */
+  int getTaskletId();
+
+  /**
    * @return the type of this VortexRequest.
    */
   RequestType getType();

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
index 7e320cb..192299a 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
@@ -32,6 +32,7 @@ public interface WorkerReport extends Serializable {
    */
   enum WorkerReportType {
     TaskletResult,
+    TaskletCancelled,
     TaskletFailure
   }
 
@@ -39,4 +40,9 @@ public interface WorkerReport extends Serializable {
    * @return the type of this WorkerReport.
    */
   WorkerReportType getType();
+
+  /**
+   * @return the taskletId of this WorkerReport.
+   */
+  int getTaskletId();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index a4b5ef0..cb62049 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -66,17 +66,27 @@ final class DefaultVortexMaster implements VortexMaster {
                      final Optional<FutureCallback<TOutput>> callback) {
     // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch.
     final VortexFuture<TOutput> vortexFuture;
+    final int id = taskletIdCounter.getAndIncrement();
     if (callback.isPresent()) {
-      vortexFuture = new VortexFuture<>(executor, callback.get());
+      vortexFuture = new VortexFuture<>(executor, this, id, callback.get());
     } else {
-      vortexFuture = new VortexFuture<>(executor);
+      vortexFuture = new VortexFuture<>(executor, this, id);
     }
 
-    this.pendingTasklets.addLast(new Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture));
+    final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture);
+    this.pendingTasklets.addLast(tasklet);
     return vortexFuture;
   }
 
   /**
+   * Cancels tasklets on the running workers.
+   */
+  @Override
+  public void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId) {
+    this.runningWorkers.cancelTasklet(mayInterruptIfRunning, taskletId);
+  }
+
+  /**
    * Add a new worker to runningWorkers.
    */
   @Override
@@ -116,6 +126,14 @@ final class DefaultVortexMaster implements VortexMaster {
   }
 
   /**
+   * Notify tasklet cancellation to runningWorkers.
+   */
+  @Override
+  public void taskletCancelled(final String workerId, final int taskletId) {
+    runningWorkers.taskletCancelled(workerId, taskletId);
+  }
+
+  /**
    * Terminate the job.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
index 6dfd8bb..8b28eab 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
@@ -143,6 +143,12 @@ class FirstFitSchedulingPolicy implements SchedulingPolicy {
     removeTasklet(workerId);
   }
 
+  @Override
+  public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+    final String workerId = vortexWorker.getId();
+    removeTasklet(workerId);
+  }
+
   private void removeTasklet(final String workerId) {
     if (idLoadMap.containsKey(workerId)) {
       idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - 1));

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
index eac4fc6..9b3e318 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
@@ -48,4 +48,4 @@ final class PendingTasklets {
   Tasklet takeFirst() throws InterruptedException {
     return pendingTasklets.takeFirst();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
index 2f71cd7..ffa2fbf 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
@@ -102,4 +102,12 @@ class RandomSchedulingPolicy implements SchedulingPolicy {
     // Do nothing
   }
 
+  /**
+   * Do nothing.
+   */
+  @Override
+  public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+    // Do nothing
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index eebac5f..7b05c0c 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -30,6 +30,8 @@ import java.util.*;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Keeps track of all running VortexWorkers and Tasklets.
@@ -38,8 +40,12 @@ import java.util.concurrent.locks.ReentrantLock;
 @ThreadSafe
 @DriverSide
 final class RunningWorkers {
+  private static final Logger LOG = Logger.getLogger(RunningWorkers.class.getName());
+
   // RunningWorkers and its locks
   private final HashMap<String, VortexWorkerManager> runningWorkers = new HashMap<>(); // Running workers/tasklets
+  private final Set<Integer> taskletsToCancel = new HashSet<>();
+
   private final Lock lock = new ReentrantLock();
   private final Condition noWorkerOrResource = lock.newCondition();
 
@@ -132,6 +138,14 @@ final class RunningWorkers {
           }
         }
 
+        // TODO[JIRA REEF-500]: Will need to support duplicate tasklets.
+        if (taskletsToCancel.contains(tasklet.getId())) {
+          tasklet.cancelled();
+          taskletsToCancel.remove(tasklet.getId());
+          LOG.log(Level.FINE, "Cancelled tasklet {0}.", tasklet.getId());
+          return;
+        }
+
         final VortexWorkerManager vortexWorkerManager = runningWorkers.get(workerId.get());
         vortexWorkerManager.launchTasklet(tasklet);
         schedulingPolicy.taskletLaunched(vortexWorkerManager, tasklet);
@@ -143,6 +157,30 @@ final class RunningWorkers {
 
   /**
    * Concurrency: Called by multiple threads.
+   * Parameter: Same taskletId can come in multiple times.
+   */
+  void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId) {
+    lock.lock();
+    try {
+      // This is not ideal since we are using a linear time search on all the workers.
+      final String workerId = getWhereTaskletWasScheduledTo(taskletId);
+      if (workerId == null) {
+        // launchTasklet called but not yet running.
+        taskletsToCancel.add(taskletId);
+        return;
+      }
+
+      if (mayInterruptIfRunning) {
+        LOG.log(Level.FINE, "Cancelling running Tasklet with ID {0}.", taskletId);
+        runningWorkers.get(workerId).cancelTasklet(taskletId);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Concurrency: Called by multiple threads.
    * Parameter: Same arguments can come in multiple times.
    * (e.g. preemption message coming before tasklet completion message multiple times)
    */
@@ -158,6 +196,8 @@ final class RunningWorkers {
 
         // Notify (possibly) waiting scheduler
         noWorkerOrResource.signal();
+
+        taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
       }
     } finally {
       lock.unlock();
@@ -181,6 +221,32 @@ final class RunningWorkers {
 
         // Notify (possibly) waiting scheduler
         noWorkerOrResource.signal();
+
+        taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Concurrency: Called by multiple threads.
+   * Parameter: Same arguments can come in multiple times.
+   * (e.g. preemption message coming before tasklet error message multiple times)
+   */
+  void taskletCancelled(final String workerId,
+                        final int taskletId) {
+    lock.lock();
+    try {
+      if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before
+        final VortexWorkerManager worker = this.runningWorkers.get(workerId);
+        final Tasklet tasklet = worker.taskletCancelled(taskletId);
+        this.schedulingPolicy.taskletCancelled(worker, tasklet);
+
+        // Notify (possibly) waiting scheduler
+        noWorkerOrResource.signal();
+
+        taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
       }
     } finally {
       lock.unlock();
@@ -209,17 +275,8 @@ final class RunningWorkers {
     return terminated;
   }
 
-  ///////////////////////////////////////// For Tests Only
-
-  /**
-   * For unit tests to check whether the worker is running.
-   */
-  boolean isWorkerRunning(final String workerId) {
-    return runningWorkers.containsKey(workerId);
-  }
-
   /**
-   * For unit tests to see where a tasklet is scheduled to.
+   * Find where a tasklet is scheduled to.
    * @param taskletId id of the tasklet in question
    * @return id of the worker (null if the tasklet was not scheduled to any worker)
    */
@@ -233,4 +290,13 @@ final class RunningWorkers {
     }
     return null;
   }
+
+  ///////////////////////////////////////// For Tests Only
+
+  /**
+   * For unit tests to check whether the worker is running.
+   */
+  boolean isWorkerRunning(final String workerId) {
+    return runningWorkers.containsKey(workerId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
index cef1cb6..a058f6f 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
@@ -57,4 +57,9 @@ interface SchedulingPolicy {
    * Tasklet failed.
    */
   void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
+
+  /**
+   * Tasklet cancelled.
+   */
+  void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
index 6cd6c44..b0138d0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
@@ -80,6 +80,13 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> {
   }
 
   /**
+   * Called by VortexMaster to let the user know that the task has been cancelled.
+   */
+  void cancelled(){
+    vortexFuture.cancelled();
+  }
+
+  /**
    * For tests.
    */
   boolean isCompleted() {

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index 352cdbd..75674cc 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -28,6 +28,7 @@ import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.common.TaskletCancelledReport;
 import org.apache.reef.vortex.common.TaskletFailureReport;
 import org.apache.reef.vortex.common.TaskletResultReport;
 import org.apache.reef.vortex.common.VortexAvroUtils;
@@ -160,6 +161,10 @@ final class VortexDriver {
         final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
         vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult());
         break;
+      case TaskletCancelled:
+        final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport)workerReport;
+        vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId());
+        break;
       case TaskletFailure:
         final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
         vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(),

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index 5c7f684..38bbcc5 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -44,6 +44,14 @@ public interface VortexMaster {
                      final Optional<FutureCallback<TOutput>> callback);
 
   /**
+   * Call this when a Tasklet is to be cancelled.
+   * @param mayInterruptIfRunning if true, will attempt to cancel running Tasklets; otherwise will only
+   *                              prevent a pending Tasklet from running.
+   * @param taskletId the ID of the Tasklet.
+   */
+  void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId);
+
+  /**
    * Call this when a new worker is up and running.
    */
   void workerAllocated(final VortexWorkerManager vortexWorkerManager);
@@ -64,6 +72,11 @@ public interface VortexMaster {
   void taskletErrored(final String workerId, final int taskletId, final Exception exception);
 
   /**
+   * Call this when a Tasklet is cancelled and the cancellation is honored.
+   */
+  void taskletCancelled(final String workerId, final int taskletId);
+
+  /**
    * Release all resources and shut down.
    */
   void terminate();

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index f019668..76d6cec 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -21,6 +21,7 @@ package org.apache.reef.vortex.driver;
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.vortex.common.TaskletCancellationRequest;
 import org.apache.reef.vortex.common.TaskletExecutionRequest;
 
 import java.io.Serializable;
@@ -51,6 +52,11 @@ class VortexWorkerManager {
     vortexRequestor.send(reefTask, taskletExecutionRequest);
   }
 
+  void cancelTasklet(final int taskletId) {
+    final TaskletCancellationRequest cancellationRequest = new TaskletCancellationRequest(taskletId);
+    vortexRequestor.send(reefTask, cancellationRequest);
+  }
+
   <TOutput extends Serializable> Tasklet taskletCompleted(final Integer taskletId, final TOutput result) {
     final Tasklet<?, TOutput> tasklet = runningTasklets.remove(taskletId);
     assert tasklet != null; // Tasklet should complete/error only once
@@ -65,6 +71,13 @@ class VortexWorkerManager {
     return tasklet;
   }
 
+  Tasklet taskletCancelled(final Integer taskletId) {
+    final Tasklet tasklet = runningTasklets.remove(taskletId);
+    assert tasklet != null; // Tasklet should finish only once.
+    tasklet.cancelled();
+    return tasklet;
+  }
+
   Collection<Tasklet> removed() {
     return runningTasklets.isEmpty() ? null : runningTasklets.values();
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index e65830c..10d23a8 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -36,6 +36,8 @@ import org.apache.reef.wake.EventHandler;
 import javax.inject.Inject;
 import java.io.Serializable;
 import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Receives commands from VortexMaster, executes them, and returns the results.
@@ -45,6 +47,7 @@ import java.util.concurrent.*;
 @Unit
 @TaskSide
 public final class VortexWorker implements Task, TaskMessageSource {
+  private static final Logger LOG = Logger.getLogger(VortexWorker.class.getName());
   private static final String MESSAGE_SOURCE_ID = ""; // empty string as there is no use for it
 
   private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>();
@@ -68,6 +71,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
   public byte[] call(final byte[] memento) throws Exception {
     final ExecutorService schedulerThread = Executors.newSingleThreadExecutor();
     final ExecutorService commandExecutor = Executors.newFixedThreadPool(numOfThreads);
+    final ConcurrentMap<Integer, Future> futures = new ConcurrentHashMap<>();
 
     // Scheduling thread starts
     schedulerThread.execute(new Runnable() {
@@ -82,37 +86,66 @@ public final class VortexWorker implements Task, TaskMessageSource {
             throw new RuntimeException(e);
           }
 
-          // Scheduler Thread: Pass the command to the worker thread pool to be executed
-          commandExecutor.execute(new Runnable() {
-            @Override
-            public void run() {
-              // Command Executor: Deserialize the command
-              final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message);
-              switch (vortexRequest.getType()) {
-              case ExecuteTasklet:
-                final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
-                try {
-                  // Command Executor: Execute the command
-                  final Serializable result = taskletExecutionRequest.execute();
-
-                  // Command Executor: Tasklet successfully returns result
-                  final WorkerReport report =
-                      new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
-                  workerReports.addLast(VortexAvroUtils.toBytes(report));
-                } catch (Exception e) {
-                  // Command Executor: Tasklet throws an exception
-                  final WorkerReport report =
-                      new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
-                  workerReports.addLast(VortexAvroUtils.toBytes(report));
-                }
-
-                heartBeatTriggerManager.triggerHeartBeat();
-                break;
-              default:
-                throw new RuntimeException("Unknown Command");
+          // Command Executor: Deserialize the command
+          final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message);
+
+          switch (vortexRequest.getType()) {
+            case ExecuteTasklet:
+              final CountDownLatch latch = new CountDownLatch(1);
+
+              // Scheduler Thread: Pass the command to the worker thread pool to be executed
+              // Record future to support cancellation.
+              futures.put(
+                  vortexRequest.getTaskletId(),
+                  commandExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                      final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+                      try {
+                        // Command Executor: Execute the command
+                        final Serializable result = taskletExecutionRequest.execute();
+                        final WorkerReport report =
+                            new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
+                        workerReports.addLast(VortexAvroUtils.toBytes(report));
+                      } catch (final InterruptedException ex) {
+                        // Assumes that user's thread follows convention that cancelled Futures
+                        // should throw InterruptedException.
+                        final WorkerReport report = new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
+                        LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", vortexRequest.getTaskletId());
+                        workerReports.addLast(VortexAvroUtils.toBytes(report));
+                      } catch (Exception e) {
+                        // Command Executor: Tasklet throws an exception
+                        final WorkerReport report =
+                            new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
+                        workerReports.addLast(VortexAvroUtils.toBytes(report));
+                      }
+
+                      try {
+                        latch.await();
+                      } catch (final InterruptedException e) {
+                        LOG.log(Level.SEVERE, "Cannot wait for Future to be put.");
+                        throw new RuntimeException(e);
+                      }
+
+                      futures.remove(vortexRequest.getTaskletId());
+                      heartBeatTriggerManager.triggerHeartBeat();
+                    }
+                  }));
+
+              // Signal that future is put.
+              latch.countDown();
+              break;
+            case CancelTasklet:
+              LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", vortexRequest.getTaskletId());
+              final Future future = futures.get(vortexRequest.getTaskletId());
+              if (future != null) {
+                future.cancel(true);
               }
-            }
-          });
+
+              break;
+            default:
+              throw new RuntimeException("Unknown Command");
+          }
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index 37298fa..6c2162a 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -26,6 +26,8 @@ import org.junit.Test;
 
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.*;
@@ -204,6 +206,56 @@ public class DefaultVortexMasterTest {
   }
 
   /**
+   * Test handling of single tasklet execution with a cancellation after launch.
+   */
+  @Test(timeout = 10000)
+  public void testSingleTaskletCancellation() throws Exception {
+    final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy());
+    final PendingTasklets pendingTasklets = new PendingTasklets();
+    final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets);
+    launchTasklets(runningWorkers, pendingTasklets, 1);
+
+    assertTrue(future.cancel(true));
+    assertTrue("The VortexFuture should be cancelled.", future.isCancelled());
+    assertTrue("The VortexFuture should be done", future.isDone());
+  }
+
+  /**
+   * Test handling of single tasklet execution with a cancellation before launch.
+   */
+  @Test(timeout = 10000)
+  public void testSingleTaskletCancellationBeforeLaunch() throws Exception {
+
+    final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy());
+    final PendingTasklets pendingTasklets = new PendingTasklets();
+    final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets);
+
+    try {
+      future.cancel(true, 100, TimeUnit.MILLISECONDS);
+      fail();
+    } catch (final TimeoutException e) {
+      // TimeoutException is expected.
+    }
+
+    launchTasklets(runningWorkers, pendingTasklets, 1);
+    assertTrue(future.cancel(true));
+    assertTrue("The VortexFuture should be cancelled.", future.isCancelled());
+    assertTrue("The VortexFuture should be done", future.isDone());
+  }
+
+  private VortexFuture createTaskletCancellationFuture(final RunningWorkers runningWorkers,
+                                                       final PendingTasklets pendingTasklets) {
+    final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction();
+    final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
+
+    final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+
+    // Allocate worker & tasklet and schedule
+    vortexMaster.workerAllocated(vortexWorkerManager1);
+    return vortexMaster.enqueueTasklet(vortexFunction, null, Optional.<FutureCallback<Integer>>empty());
+  }
+
+  /**
    * Launch specified number of tasklets as a substitute for PendingTaskletLauncher.
    * @return ids of launched tasklets
    */

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index fc333e3..8704b0b 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -21,12 +21,18 @@ package org.apache.reef.vortex.driver;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.TaskletCancellationRequest;
+import org.apache.reef.vortex.common.VortexRequest;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.Serializable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -37,6 +43,7 @@ public final class TestUtil {
   private final AtomicInteger taskletId = new AtomicInteger(0);
   private final AtomicInteger workerId = new AtomicInteger(0);
   private final Executor executor = Executors.newFixedThreadPool(5);
+  private final VortexMaster vortexMaster = mock(VortexMaster.class);
 
   /**
    * @return a new mocked worker.
@@ -45,14 +52,28 @@ public final class TestUtil {
     final RunningTask reefTask = mock(RunningTask.class);
     when(reefTask.getId()).thenReturn("worker" + String.valueOf(workerId.getAndIncrement()));
     final VortexRequestor vortexRequestor = mock(VortexRequestor.class);
-    return new VortexWorkerManager(vortexRequestor, reefTask);
+    final VortexWorkerManager workerManager = new VortexWorkerManager(vortexRequestor, reefTask);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(final InvocationOnMock invocation) throws Throwable {
+        final VortexRequest request = (VortexRequest)invocation.getArguments()[1];
+        if (request instanceof TaskletCancellationRequest) {
+          workerManager.taskletCancelled(request.getTaskletId());
+        }
+
+        return null;
+      }
+    }).when(vortexRequestor).send(any(RunningTask.class), any(VortexRequest.class));
+
+    return workerManager;
   }
 
   /**
    * @return a new dummy tasklet.
    */
   public Tasklet newTasklet() {
-    return new Tasklet(taskletId.getAndIncrement(), null, null, new VortexFuture(executor));
+    final int id = taskletId.getAndIncrement();
+    return new Tasklet(id, null, null, new VortexFuture(executor, vortexMaster, id));
   }
 
   /**
@@ -68,6 +89,23 @@ public final class TestUtil {
   }
 
   /**
+   * @return a new dummy function.
+   */
+  public VortexFunction newInfiniteLoopFunction() {
+    return new VortexFunction() {
+      @Override
+      public Serializable call(final Serializable serializable) throws Exception {
+        while(true) {
+          Thread.sleep(100);
+          if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedException();
+          }
+        }
+      }
+    };
+  }
+
+  /**
    * @return a dummy integer-integer function.
    */
   public VortexFunction<Integer, Integer> newIntegerFunction() {

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
index f9068cd..fca17f9 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
@@ -20,13 +20,15 @@ package org.apache.reef.tests.applications.vortex;
 
 import org.apache.reef.tests.applications.vortex.addone.AddOneTest;
 import org.apache.reef.tests.applications.vortex.exception.VortexExceptionTest;
+import org.apache.reef.tests.applications.vortex.cancellation.TaskletCancellationTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
     AddOneTest.class,
-    VortexExceptionTest.class
+    VortexExceptionTest.class,
+    TaskletCancellationTest.class
     })
 public final class VortexTestSuite {
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
new file mode 100644
index 0000000..f1c982e
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.applications.vortex.cancellation;
+
+import org.apache.reef.vortex.api.VortexFunction;
+
+import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Runs an infinite loop and waits for cancellation.
+ */
+public final class InfiniteLoopWithCancellationFunction implements VortexFunction {
+  private static final Logger LOG = Logger.getLogger(InfiniteLoopWithCancellationFunction.class.getName());
+
+  @Override
+  public Serializable call(final Serializable serializable) throws Exception {
+    LOG.log(Level.FINE, "Entered Infinite Loop Tasklet.");
+    while (true) {
+      Thread.sleep(100);
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException("Expected exception!");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
new file mode 100644
index 0000000..d8ea09c
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.applications.vortex.cancellation;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.vortex.driver.VortexConfHelper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests the cancellation of a tasklet.
+ */
+public final class TaskletCancellationTest {
+  private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+  /**
+   * Set up the test environment.
+   */
+  @Before
+  public void setUp() throws Exception {
+    this.testEnvironment.setUp();
+  }
+
+  /**
+   * Tear down the test environment.
+   */
+  @After
+  public void tearDown() throws Exception {
+    this.testEnvironment.tearDown();
+  }
+
+  @Test
+  public void testVortexTaskletCancellation() {
+    final Configuration conf =
+        VortexConfHelper.getVortexConf(
+            "TEST_Vortex_TaskletCancellationTest", TaskletCancellationTestStart.class, 2, 64, 4, 2000);
+    final LauncherStatus status = this.testEnvironment.run(conf);
+    Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
new file mode 100644
index 0000000..d60e0b5
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.applications.vortex.cancellation;
+
+import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.api.VortexThreadPool;
+import org.junit.Assert;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests the cancellation of a tasklet.
+ */
+public final class TaskletCancellationTestStart implements VortexStart {
+
+  @Inject
+  private TaskletCancellationTestStart() {
+  }
+
+  @Override
+  public void start(final VortexThreadPool vortexThreadPool) {
+    final InfiniteLoopWithCancellationFunction function = new InfiniteLoopWithCancellationFunction();
+    final VortexFuture future = vortexThreadPool.submit(function, 0);
+
+    try {
+      // Hacky way to increase probability that the task has been launched.
+      // TODO[JIRA REEF-1051]: Query the VortexMaster for the Tasklet status.
+      future.get(10, TimeUnit.SECONDS);
+    } catch (final TimeoutException e) {
+      // Harmless.
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException("Unexpected exception.");
+    }
+
+    Assert.assertTrue(future.cancel(true));
+
+    try {
+      future.get();
+      Assert.fail();
+    } catch (final ExecutionException e) {
+      // Expected.
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
+    Assert.assertTrue(future.isCancelled());
+    Assert.assertTrue(future.isDone());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java
new file mode 100644
index 0000000..8bd04c7
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Package for testing Vortex Tasklet cancellation.
+ */
+package org.apache.reef.tests.applications.vortex.cancellation;
\ No newline at end of file