You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by yu...@apache.org on 2015/12/22 08:06:50 UTC

reef git commit: [REEF-1075] Create Tasklet abstraction to enable one-to-many mapping of "Future-like" object to TaskletIds

Repository: reef
Updated Branches:
  refs/heads/master f10bbf462 -> d6b316925


[REEF-1075] Create Tasklet abstraction to enable one-to-many mapping of "Future-like" object to TaskletIds

This addressed the issue by
  * Added VortexFutureDelegate to map a single result/failure to multiple tasklets.
  * Moved logic of calling back to Future to VortexMaster.

JIRA:
  [REEF-1075](https://issues.apache.org/jira/browse/REEF-1075)

Pull request:
  This closes #738


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d6b31692
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d6b31692
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d6b31692

Branch: refs/heads/master
Commit: d6b31692589addfa8dccf7e70892b8eebabc8093
Parents: f10bbf4
Author: Andrew Chung <af...@apache.org>
Authored: Wed Dec 16 11:27:33 2015 -0800
Committer: Yunseong Lee <yu...@apache.org>
Committed: Tue Dec 22 14:54:52 2015 +0800

----------------------------------------------------------------------
 .../reef-vortex/src/main/avro/WorkerReport.avsc |  34 ++++++-
 .../apache/reef/vortex/api/VortexFuture.java    |  46 +++++++--
 .../common/TaskletAggregationFailureReport.java |  65 ++++++++++++
 .../common/TaskletAggregationResultReport.java  |  71 +++++++++++++
 .../vortex/common/TaskletFailureReport.java     |  28 +++---
 .../reef/vortex/common/TaskletReport.java       |   4 +-
 .../reef/vortex/common/TaskletResultReport.java |  27 +++--
 .../reef/vortex/common/VortexAvroUtils.java     |  61 ++++++++++-
 .../vortex/common/VortexFutureDelegate.java     |  61 +++++++++++
 .../reef/vortex/driver/DefaultVortexMaster.java | 100 +++++++++++++++----
 .../vortex/driver/FirstFitSchedulingPolicy.java |  26 +----
 .../vortex/driver/RandomSchedulingPolicy.java   |  19 +---
 .../reef/vortex/driver/RunningWorkers.java      |  60 +----------
 .../reef/vortex/driver/SchedulingPolicy.java    |  16 +--
 .../org/apache/reef/vortex/driver/Tasklet.java  |  35 ++-----
 .../apache/reef/vortex/driver/VortexDriver.java |  30 +-----
 .../apache/reef/vortex/driver/VortexMaster.java |  15 +--
 .../reef/vortex/driver/VortexWorkerManager.java |  27 ++---
 .../reef/vortex/evaluator/VortexWorker.java     |   7 +-
 .../vortex/driver/DefaultVortexMasterTest.java  |  26 +++--
 .../reef/vortex/driver/RunningWorkersTest.java  |  14 ++-
 .../org/apache/reef/vortex/driver/TestUtil.java |  70 ++++++++++++-
 22 files changed, 567 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
index 24110fe..5d7395d 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -22,6 +22,15 @@
     "type": "record",
     "name": "AvroTaskletResultReport",
     "fields": [
+      {"name": "taskletId", "type": "int"},
+      {"name": "serializedOutput", "type": "bytes"}
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroTaskletAggregationResultReport",
+    "fields": [
       {
         "name": "taskletIds",
         "type":
@@ -46,6 +55,15 @@
     "type": "record",
     "name": "AvroTaskletFailureReport",
     "fields": [
+      {"name": "taskletId", "type": "int"},
+      {"name": "serializedException", "type": "bytes"}
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroTaskletAggregationFailureReport",
+    "fields": [
       {
         "name": "taskletIds",
         "type":
@@ -68,12 +86,24 @@
         {
           "type": "enum",
           "name": "AvroReportType",
-          "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]
+          "symbols": [
+            "TaskletResult",
+            "TaskletAggregationResult",
+            "TaskletCancelled",
+            "TaskletFailure",
+            "TaskletAggregationFailure"
+          ]
         }
       },
       {
         "name": "taskletReport",
-        "type": ["AvroTaskletResultReport", "AvroTaskletCancelledReport", "AvroTaskletFailureReport"]
+        "type": [
+          "AvroTaskletResultReport",
+          "AvroTaskletAggregationResultReport",
+          "AvroTaskletCancelledReport",
+          "AvroTaskletFailureReport",
+          "AvroTaskletAggregationFailureReport"
+        ]
       }
     ]
   },

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index 7f4fbb7..5e7f9a2 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -21,8 +21,11 @@ package org.apache.reef.vortex.api;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.common.VortexFutureDelegate;
 import org.apache.reef.vortex.driver.VortexMaster;
 
+import java.io.Serializable;
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
@@ -32,7 +35,8 @@ import java.util.logging.Logger;
  * The interface between user code and submitted task.
  */
 @Unstable
-public final class VortexFuture<TOutput> implements Future<TOutput> {
+public final class VortexFuture<TOutput extends Serializable>
+    implements Future<TOutput>, VortexFutureDelegate<TOutput> {
   private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName());
 
   // userResult starts out as null. If not null => variable is set and tasklet returned.
@@ -173,9 +177,13 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
   }
 
   /**
-   * Called by VortexMaster to let the user know that the task completed.
+   * Called by VortexMaster to let the user know that the Tasklet completed.
    */
-  public void completed(final TOutput result) {
+  @Private
+  @Override
+  public void completed(final int pTaskletId, final TOutput result) {
+    assert taskletId == pTaskletId;
+
     this.userResult = Optional.ofNullable(result);
     if (callbackHandler != null) {
       executor.execute(new Runnable() {
@@ -189,10 +197,22 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
   }
 
   /**
-   * Called by VortexMaster to let the user know that the task threw an exception.
+   * VortexMaster should never call this.
+   */
+  @Private
+  @Override
+  public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) {
+    throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated.");
+  }
+
+  /**
+   * Called by VortexMaster to let the user know that the Tasklet threw an exception.
    */
   @Private
-  public void threwException(final Exception exception) {
+  @Override
+  public void threwException(final int pTaskletId, final Exception exception) {
+    assert taskletId == pTaskletId;
+
     this.userException = exception;
     if (callbackHandler != null) {
       executor.execute(new Runnable() {
@@ -206,10 +226,22 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
   }
 
   /**
-   * Called by VortexMaster to let the user know that the task was cancelled.
+   * VortexMaster should never call this.
    */
   @Private
-  public void cancelled() {
+  @Override
+  public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) {
+    throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated");
+  }
+
+  /**
+   * Called by VortexMaster to let the user know that the Tasklet was cancelled.
+   */
+  @Private
+  @Override
+  public void cancelled(final int pTaskletId) {
+    assert taskletId == pTaskletId;
+
     this.cancelled.set(true);
     if (callbackHandler != null) {
       executor.execute(new Runnable() {

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
new file mode 100644
index 0000000..e6a4c82
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Report of a tasklet exception on aggregation.
+ */
+@Unstable
+public final class TaskletAggregationFailureReport implements TaskletReport {
+  private final List<Integer> taskletIds;
+  private final Exception exception;
+
+  /**
+   * @param taskletIds of the failed tasklet(s).
+   * @param exception that caused the tasklet failure.
+   */
+  public TaskletAggregationFailureReport(final List<Integer> taskletIds, final Exception exception) {
+    this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds));
+    this.exception = exception;
+  }
+
+  /**
+   * @return the type of this TaskletReport.
+   */
+  @Override
+  public TaskletReportType getType() {
+    return TaskletReportType.TaskletAggregationFailure;
+  }
+
+  /**
+   * @return the taskletIds that failed on aggregation.
+   */
+  public List<Integer> getTaskletIds() {
+    return taskletIds;
+  }
+
+  /**
+   * @return the exception that caused the tasklet aggregation failure.
+   */
+  public Exception getException() {
+    return exception;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
new file mode 100644
index 0000000..ce4a015
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Report of a Tasklet aggregation execution result.
+ */
+@Private
+@DriverSide
+@Unstable
+public final class TaskletAggregationResultReport<TOutput extends Serializable> implements TaskletReport {
+  private final List<Integer> taskletIds;
+  private final TOutput result;
+
+  /**
+   * @param taskletIds of the tasklets.
+   * @param result of the tasklet execution.
+   */
+  public TaskletAggregationResultReport(final List<Integer> taskletIds, final TOutput result) {
+    this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds));
+    this.result = result;
+  }
+
+  /**
+   * @return the type of this TaskletReport.
+   */
+  @Override
+  public TaskletReportType getType() {
+    return TaskletReportType.TaskletAggregationResult;
+  }
+
+  /**
+   * @return the TaskletId(s) of this TaskletReport
+   */
+  public List<Integer> getTaskletIds() {
+    return taskletIds;
+  }
+
+  /**
+   * @return the result of the Tasklet aggregation execution.
+   */
+  public TOutput getResult() {
+    return result;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
index 487a3d2..5c0b2de 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
@@ -19,25 +19,25 @@
 package org.apache.reef.vortex.common;
 
 import org.apache.reef.annotations.Unstable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
 
 /**
- * Report of a tasklet exception.
+ * Report of a Tasklet exception.
  */
 @Unstable
+@Private
+@DriverSide
 public final class TaskletFailureReport implements TaskletReport {
-  private final List<Integer> taskletIds;
+  private final int taskletId;
   private final Exception exception;
 
   /**
-   * @param taskletIds of the failed tasklet(s).
+   * @param taskletId of the failed Tasklet.
    * @param exception that caused the tasklet failure.
    */
-  public TaskletFailureReport(final List<Integer> taskletIds, final Exception exception) {
-    this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds));
+  public TaskletFailureReport(final int taskletId, final Exception exception) {
+    this.taskletId = taskletId;
     this.exception = exception;
   }
 
@@ -50,16 +50,14 @@ public final class TaskletFailureReport implements TaskletReport {
   }
 
   /**
-   * Returns multiple TaskletIds if an aggregation of Tasklets fail.
-   * Returns a single TaskletId if a Tasklet fails.
-   * @return the taskletId(s) of this TaskletReport.
+   * @return the taskletId of this TaskletReport.
    */
-  public List<Integer> getTaskletIds() {
-    return taskletIds;
+  public int getTaskletId() {
+    return taskletId;
   }
 
   /**
-   * @return the exception that caused the tasklet failure.
+   * @return the exception that caused the Tasklet failure.
    */
   public Exception getException() {
     return exception;

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
index 7e083eb..6392b23 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
@@ -36,8 +36,10 @@ public interface TaskletReport extends Serializable {
    */
   enum TaskletReportType {
     TaskletResult,
+    TaskletAggregationResult,
     TaskletCancelled,
-    TaskletFailure
+    TaskletFailure,
+    TaskletAggregationFailure
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
index 08e8d06..8e3bac3 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -19,26 +19,27 @@
 package org.apache.reef.vortex.common;
 
 import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 
 /**
  * Report of a tasklet execution result.
  */
 @Unstable
+@Private
+@DriverSide
 public final class TaskletResultReport<TOutput extends Serializable> implements TaskletReport {
-  private final List<Integer> taskletIds;
+  private final int taskletId;
   private final TOutput result;
 
   /**
-   * @param taskletIds of the tasklets.
-   * @param result of the tasklet execution.
+   * @param taskletId of the Tasklet.
+   * @param result of the Tasklet execution.
    */
-  public TaskletResultReport(final List<Integer> taskletIds, final TOutput result) {
-    this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds));
+  public TaskletResultReport(final int taskletId, final TOutput result) {
+    this.taskletId = taskletId;
     this.result = result;
   }
 
@@ -51,16 +52,14 @@ public final class TaskletResultReport<TOutput extends Serializable> implements
   }
 
   /**
-   * Returns multiple TaskletIds if the result is from an Aggregation.
-   * Returns a single TaskletId if the result is from a single Tasklet.
-   * @return the TaskletId(s) of this TaskletReport
+   * @return the TaskletId of this TaskletReport
    */
-  public List<Integer> getTaskletIds() {
-    return taskletIds;
+  public int getTaskletId() {
+    return taskletId;
   }
 
   /**
-   * @return the result of the tasklet execution.
+   * @return the result of the Tasklet execution.
    */
   public TOutput getResult() {
     return result;

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index e4f747d..cc3cced 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -22,6 +22,9 @@ import org.apache.avro.io.*;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.commons.lang.SerializationUtils;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.common.avro.*;
 
@@ -35,6 +38,9 @@ import java.util.List;
 /**
  * Serialize and deserialize Vortex message to/from byte array.
  */
+@Private
+@DriverSide
+@Unstable
 public final class VortexAvroUtils {
   /**
    * Serialize VortexRequest to byte array.
@@ -101,11 +107,26 @@ public final class VortexAvroUtils {
             .setReportType(AvroReportType.TaskletResult)
             .setTaskletReport(
                 AvroTaskletResultReport.newBuilder()
-                    .setTaskletIds(taskletResultReport.getTaskletIds())
+                    .setTaskletId(taskletResultReport.getTaskletId())
                     .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
                     .build())
             .build();
         break;
+      case TaskletAggregationResult:
+        final TaskletAggregationResultReport taskletAggregationResultReport =
+            (TaskletAggregationResultReport) taskletReport;
+        // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+        final byte[] serializedAggregationOutput =
+            SerializationUtils.serialize(taskletAggregationResultReport.getResult());
+        avroTaskletReport = AvroTaskletReport.newBuilder()
+            .setReportType(AvroReportType.TaskletAggregationResult)
+            .setTaskletReport(
+                AvroTaskletAggregationResultReport.newBuilder()
+                    .setTaskletIds(taskletAggregationResultReport.getTaskletIds())
+                    .setSerializedOutput(ByteBuffer.wrap(serializedAggregationOutput))
+                    .build())
+            .build();
+        break;
       case TaskletCancelled:
         final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
         avroTaskletReport = AvroTaskletReport.newBuilder()
@@ -123,11 +144,25 @@ public final class VortexAvroUtils {
             .setReportType(AvroReportType.TaskletFailure)
             .setTaskletReport(
                 AvroTaskletFailureReport.newBuilder()
-                    .setTaskletIds(taskletFailureReport.getTaskletIds())
+                    .setTaskletId(taskletFailureReport.getTaskletId())
                     .setSerializedException(ByteBuffer.wrap(serializedException))
                     .build())
             .build();
         break;
+      case TaskletAggregationFailure:
+        final TaskletAggregationFailureReport taskletAggregationFailureReport =
+            (TaskletAggregationFailureReport) taskletReport;
+        final byte[] serializedAggregationException =
+            SerializationUtils.serialize(taskletAggregationFailureReport.getException());
+        avroTaskletReport = AvroTaskletReport.newBuilder()
+            .setReportType(AvroReportType.TaskletAggregationFailure)
+            .setTaskletReport(
+                AvroTaskletAggregationFailureReport.newBuilder()
+                    .setTaskletIds(taskletAggregationFailureReport.getTaskletIds())
+                    .setSerializedException(ByteBuffer.wrap(serializedAggregationException))
+                    .build())
+            .build();
+        break;
       default:
         throw new RuntimeException("Undefined message type");
       }
@@ -197,7 +232,16 @@ public final class VortexAvroUtils {
         // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
         final Serializable output =
             (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
-        taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletIds(), output);
+        taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
+        break;
+      case TaskletAggregationResult:
+        final AvroTaskletAggregationResultReport taskletAggregationResultReport =
+            (AvroTaskletAggregationResultReport)avroTaskletReport.getTaskletReport();
+        final Serializable aggregationOutput =
+            (Serializable) SerializationUtils.deserialize(
+                taskletAggregationResultReport.getSerializedOutput().array());
+        taskletReport =
+            new TaskletAggregationResultReport<>(taskletAggregationResultReport.getTaskletIds(), aggregationOutput);
         break;
       case TaskletCancelled:
         final AvroTaskletCancelledReport taskletCancelledReport =
@@ -209,7 +253,16 @@ public final class VortexAvroUtils {
             (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport();
         final Exception exception =
             (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
-        taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletIds(), exception);
+        taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
+        break;
+      case TaskletAggregationFailure:
+        final AvroTaskletAggregationFailureReport taskletAggregationFailureReport =
+            (AvroTaskletAggregationFailureReport)avroTaskletReport.getTaskletReport();
+        final Exception aggregationException =
+            (Exception) SerializationUtils.deserialize(
+                taskletAggregationFailureReport.getSerializedException().array());
+        taskletReport =
+            new TaskletAggregationFailureReport(taskletAggregationFailureReport.getTaskletIds(), aggregationException);
         break;
       default:
         throw new RuntimeException("Undefined TaskletReport type");

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
new file mode 100644
index 0000000..55f3cf5
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Exposes functions to be called by the {@link org.apache.reef.vortex.driver.VortexMaster}
+ * to note that a list of Tasklets associated with a Future has completed.
+ */
+@Unstable
+@DriverSide
+@Private
+public interface VortexFutureDelegate<TOutput extends Serializable> {
+
+  /**
+   * A Tasklet associated with the future has completed with a result.
+   */
+  void completed(final int taskletId, final TOutput result);
+
+  /**
+   * The list of aggregated Tasklets associated with the Future that have completed with a result.
+   */
+  void aggregationCompleted(final List<Integer> taskletIds, final TOutput result);
+
+  /**
+   * A Tasklet associated with the Future has thrown an Exception.
+   */
+  void threwException(final int taskletId, final Exception exception);
+
+  /**
+   * The list of Tasklets associated with the Future that have thrown an Exception.
+   */
+  void aggregationThrewException(final List<Integer> taskletIds, final Exception exception);
+
+  /**
+   * A Tasklet associated with the Future has been cancelled.
+   */
+  void cancelled(final int taskletId);
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index cb62049..80c3cb0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -25,6 +25,7 @@ import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.*;
 
 import javax.inject.Inject;
 import java.io.Serializable;
@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 @ThreadSafe
 @DriverSide
 final class DefaultVortexMaster implements VortexMaster {
+  private final Map<Integer, VortexFutureDelegate> taskletFutureMap = new HashMap<>();
   private final AtomicInteger taskletIdCounter = new AtomicInteger();
   private final RunningWorkers runningWorkers;
   private final PendingTasklets pendingTasklets;
@@ -74,7 +76,9 @@ final class DefaultVortexMaster implements VortexMaster {
     }
 
     final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture);
+    putDelegate(Collections.singletonList(tasklet), vortexFuture);
     this.pendingTasklets.addLast(tasklet);
+
     return vortexFuture;
   }
 
@@ -107,37 +111,97 @@ final class DefaultVortexMaster implements VortexMaster {
     }
   }
 
-  /**
-   * Notify task completion to runningWorkers.
-   */
   @Override
-  public void taskletCompleted(final String workerId,
-                               final int taskletId,
-                               final Serializable result) {
-    runningWorkers.completeTasklet(workerId, taskletId, result);
+  public void workerReported(final String workerId, final WorkerReport workerReport) {
+    for (final TaskletReport taskletReport : workerReport.getTaskletReports()) {
+      switch (taskletReport.getType()) {
+      case TaskletResult:
+        final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport;
+
+        final int resultTaskletId = taskletResultReport.getTaskletId();
+        final List<Integer> singletonResultTaskletId = Collections.singletonList(resultTaskletId);
+        runningWorkers.doneTasklets(workerId, singletonResultTaskletId);
+        fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, taskletResultReport.getResult());
+
+        break;
+      case TaskletAggregationResult:
+        final TaskletAggregationResultReport taskletAggregationResultReport =
+            (TaskletAggregationResultReport) taskletReport;
+
+        final List<Integer> aggregatedTaskletIds = taskletAggregationResultReport.getTaskletIds();
+        runningWorkers.doneTasklets(workerId, aggregatedTaskletIds);
+        fetchDelegate(aggregatedTaskletIds).aggregationCompleted(
+            aggregatedTaskletIds, taskletAggregationResultReport.getResult());
+
+        break;
+      case TaskletCancelled:
+        final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
+        final List<Integer> cancelledIdToList = Collections.singletonList(taskletCancelledReport.getTaskletId());
+        runningWorkers.doneTasklets(workerId, cancelledIdToList);
+        fetchDelegate(cancelledIdToList).cancelled(taskletCancelledReport.getTaskletId());
+
+        break;
+      case TaskletFailure:
+        final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport;
+
+        final int failureTaskletId = taskletFailureReport.getTaskletId();
+        final List<Integer> singletonFailedTaskletId = Collections.singletonList(failureTaskletId);
+        runningWorkers.doneTasklets(workerId, singletonFailedTaskletId);
+        fetchDelegate(singletonFailedTaskletId).threwException(failureTaskletId, taskletFailureReport.getException());
+
+        break;
+      case TaskletAggregationFailure:
+        final TaskletAggregationFailureReport taskletAggregationFailureReport =
+            (TaskletAggregationFailureReport) taskletReport;
+
+        final List<Integer> aggregationFailedTaskletIds = taskletAggregationFailureReport.getTaskletIds();
+        runningWorkers.doneTasklets(workerId, aggregationFailedTaskletIds);
+        fetchDelegate(aggregationFailedTaskletIds).aggregationThrewException(aggregationFailedTaskletIds,
+            taskletAggregationFailureReport.getException());
+        break;
+      default:
+        throw new RuntimeException("Unknown Report");
+      }
+    }
   }
 
   /**
-   * Notify task failure to runningWorkers.
+   * Terminate the job.
    */
   @Override
-  public void taskletErrored(final String workerId, final int taskletId, final Exception exception) {
-    runningWorkers.errorTasklet(workerId, taskletId, exception);
+  public void terminate() {
+    runningWorkers.terminate();
   }
 
   /**
-   * Notify tasklet cancellation to runningWorkers.
+   * Puts a delegate to associate with a Tasklet.
    */
-  @Override
-  public void taskletCancelled(final String workerId, final int taskletId) {
-    runningWorkers.taskletCancelled(workerId, taskletId);
+  private synchronized void putDelegate(final List<Tasklet> tasklets, final VortexFutureDelegate delegate) {
+    for (final Tasklet tasklet : tasklets) {
+      taskletFutureMap.put(tasklet.getId(), delegate);
+    }
   }
 
   /**
-   * Terminate the job.
+   * Fetches a delegate that maps to the list of Tasklets.
    */
-  @Override
-  public void terminate() {
-    runningWorkers.terminate();
+  private synchronized VortexFutureDelegate fetchDelegate(final List<Integer> taskletIds) {
+    VortexFutureDelegate delegate = null;
+    for (final int taskletId : taskletIds) {
+      final VortexFutureDelegate currDelegate = taskletFutureMap.remove(taskletId);
+      if (currDelegate == null) {
+        // TODO[JIRA REEF-500]: Consider duplicate tasklets.
+        throw new RuntimeException("Tasklet should only be removed once.");
+      }
+
+      if (delegate == null) {
+        delegate = currDelegate;
+      } else {
+        assert delegate == currDelegate;
+      }
+    }
+
+    return delegate;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
index 8b28eab..489751f 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
@@ -125,33 +125,17 @@ class FirstFitSchedulingPolicy implements SchedulingPolicy {
 
   /**
    * @param vortexWorker that the tasklet completed in
-   * @param tasklet completed
+   * @param tasklets completed
    */
   @Override
-  public void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+  public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) {
     final String workerId = vortexWorker.getId();
-    removeTasklet(workerId);
+    removeTasklet(workerId, tasklets);
   }
 
-  /**
-   * @param vortexWorker that the tasklet failed in
-   * @param tasklet failed
-   */
-  @Override
-  public void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
-    final String workerId = vortexWorker.getId();
-    removeTasklet(workerId);
-  }
-
-  @Override
-  public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
-    final String workerId = vortexWorker.getId();
-    removeTasklet(workerId);
-  }
-
-  private void removeTasklet(final String workerId) {
+  private void removeTasklet(final String workerId, final List<Tasklet> tasklets) {
     if (idLoadMap.containsKey(workerId)) {
-      idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - 1));
+      idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - tasklets.size()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
index ffa2fbf..76dc5ca 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
@@ -90,24 +90,7 @@ class RandomSchedulingPolicy implements SchedulingPolicy {
    * Do nothing.
    */
   @Override
-  public void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+  public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) {
     // Do nothing
   }
-
-  /**
-   * Do nothing.
-   */
-  @Override
-  public void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
-    // Do nothing
-  }
-
-  /**
-   * Do nothing.
-   */
-  @Override
-  public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
-    // Do nothing
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index 7b05c0c..a1fb96f 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -25,7 +25,6 @@ import org.apache.reef.util.Optional;
 
 import javax.inject.Inject;
 
-import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -184,69 +183,18 @@ final class RunningWorkers {
    * Parameter: Same arguments can come in multiple times.
    * (e.g. preemption message coming before tasklet completion message multiple times)
    */
-  void completeTasklet(final String workerId,
-                       final int taskletId,
-                       final Serializable result) {
+  void doneTasklets(final String workerId, final List<Integer> taskletIds) {
     lock.lock();
     try {
       if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before
         final VortexWorkerManager worker = this.runningWorkers.get(workerId);
-        final Tasklet tasklet = worker.taskletCompleted(taskletId, result);
-        this.schedulingPolicy.taskletCompleted(worker, tasklet);
+        final List<Tasklet> tasklets = worker.taskletsDone(taskletIds);
+        this.schedulingPolicy.taskletsDone(worker, tasklets);
 
-        // Notify (possibly) waiting scheduler
-        noWorkerOrResource.signal();
-
-        taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Concurrency: Called by multiple threads.
-   * Parameter: Same arguments can come in multiple times.
-   * (e.g. preemption message coming before tasklet error message multiple times)
-   */
-  void errorTasklet(final String workerId,
-                    final int taskletId,
-                    final Exception exception) {
-    lock.lock();
-    try {
-      if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before
-        final VortexWorkerManager worker = this.runningWorkers.get(workerId);
-        final Tasklet tasklet = worker.taskletThrewException(taskletId, exception);
-        this.schedulingPolicy.taskletFailed(worker, tasklet);
-
-        // Notify (possibly) waiting scheduler
-        noWorkerOrResource.signal();
-
-        taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Concurrency: Called by multiple threads.
-   * Parameter: Same arguments can come in multiple times.
-   * (e.g. preemption message coming before tasklet error message multiple times)
-   */
-  void taskletCancelled(final String workerId,
-                        final int taskletId) {
-    lock.lock();
-    try {
-      if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before
-        final VortexWorkerManager worker = this.runningWorkers.get(workerId);
-        final Tasklet tasklet = worker.taskletCancelled(taskletId);
-        this.schedulingPolicy.taskletCancelled(worker, tasklet);
+        taskletsToCancel.removeAll(taskletIds); // cleanup to prevent memory leak.
 
         // Notify (possibly) waiting scheduler
         noWorkerOrResource.signal();
-
-        taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
       }
     } finally {
       lock.unlock();

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
index a058f6f..bccc0ea 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
@@ -21,6 +21,8 @@ package org.apache.reef.vortex.driver;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.util.Optional;
 
+import java.util.List;
+
 /**
  * For choosing which worker to schedule the tasklet onto.
  */
@@ -49,17 +51,7 @@ interface SchedulingPolicy {
   void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
 
   /**
-   * Tasklet completed.
-   */
-  void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
-
-  /**
-   * Tasklet failed.
-   */
-  void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
-
-  /**
-   * Tasklet cancelled.
+   * Tasklets completed.
    */
-  void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
+  void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets);
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
index b0138d0..24db3cb 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
@@ -20,7 +20,7 @@ package org.apache.reef.vortex.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.VortexFutureDelegate;
 
 import java.io.Serializable;
 
@@ -32,16 +32,16 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> {
   private final int taskletId;
   private final VortexFunction<TInput, TOutput> userTask;
   private final TInput input;
-  private final VortexFuture<TOutput> vortexFuture;
+  private final VortexFutureDelegate delegate;
 
   Tasklet(final int taskletId,
           final VortexFunction<TInput, TOutput> userTask,
           final TInput input,
-          final VortexFuture<TOutput> vortexFuture) {
+          final VortexFutureDelegate delegate) {
     this.taskletId = taskletId;
     this.userTask = userTask;
     this.input = input;
-    this.vortexFuture = vortexFuture;
+    this.delegate = delegate;
   }
 
   /**
@@ -66,31 +66,10 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> {
   }
 
   /**
-   * Called by VortexMaster to let the user know that the task completed.
+   * Called by {@link RunningWorkers} to cancel the Tasklet before launch.
    */
-  void completed(final TOutput result) {
-    vortexFuture.completed(result);
-  }
-
-  /**
-   * Called by VortexMaster to let the user know that the task threw an exception.
-   */
-  void threwException(final Exception exception) {
-    vortexFuture.threwException(exception);
-  }
-
-  /**
-   * Called by VortexMaster to let the user know that the task has been cancelled.
-   */
-  void cancelled(){
-    vortexFuture.cancelled();
-  }
-
-  /**
-   * For tests.
-   */
-  boolean isCompleted() {
-    return vortexFuture.isDone();
+  void cancelled() {
+    delegate.cancelled(taskletId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index 993b32f..f581f99 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -37,7 +37,6 @@ import org.apache.reef.wake.impl.ThreadPoolStage;
 import org.apache.reef.wake.time.event.StartTime;
 
 import javax.inject.Inject;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -153,34 +152,7 @@ final class VortexDriver {
     public void onNext(final TaskMessage taskMessage) {
       final String workerId = taskMessage.getId();
       final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get());
-
-      // TODO[JIRA REEF-942]: Fix when aggregation is allowed.
-      assert workerReport.getTaskletReports().size() == 1;
-
-      final TaskletReport taskletReport = workerReport.getTaskletReports().get(0);
-      switch (taskletReport.getType()) {
-      case TaskletResult:
-        final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport;
-
-        // TODO[JIRA REEF-942]: Fix when aggregation is allowed.
-        final List<Integer> resultTaskletIds = taskletResultReport.getTaskletIds();
-
-        assert resultTaskletIds.size() == 1;
-        vortexMaster.taskletCompleted(workerId, resultTaskletIds.get(0),
-            taskletResultReport.getResult());
-        break;
-      case TaskletCancelled:
-        final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
-        vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId());
-        break;
-      case TaskletFailure:
-        final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport;
-        vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletIds().get(0),
-            taskletFailureReport.getException());
-        break;
-      default:
-        throw new RuntimeException("Unknown Report");
-      }
+      vortexMaster.workerReported(workerId, workerReport);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index 38bbcc5..becf3f9 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -25,6 +25,7 @@ import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.WorkerReport;
 
 import java.io.Serializable;
 
@@ -62,19 +63,9 @@ public interface VortexMaster {
   void workerPreempted(final String id);
 
   /**
-   * Call this when a Tasklet is completed.
+   * Call this when a worker has reported back.
    */
-  void taskletCompleted(final String workerId, final int taskletId, final Serializable result);
-
-  /**
-   * Call this when a Tasklet errored.
-   */
-  void taskletErrored(final String workerId, final int taskletId, final Exception exception);
-
-  /**
-   * Call this when a Tasklet is cancelled and the cancellation is honored.
-   */
-  void taskletCancelled(final String workerId, final int taskletId);
+  void workerReported(final String workerId, final WorkerReport workerReport);
 
   /**
    * Release all resources and shut down.

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index 76d6cec..ffba985 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -25,8 +25,7 @@ import org.apache.reef.vortex.common.TaskletCancellationRequest;
 import org.apache.reef.vortex.common.TaskletExecutionRequest;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
+import java.util.*;
 
 /**
  * Representation of a VortexWorkerManager in Driver.
@@ -57,25 +56,13 @@ class VortexWorkerManager {
     vortexRequestor.send(reefTask, cancellationRequest);
   }
 
-  <TOutput extends Serializable> Tasklet taskletCompleted(final Integer taskletId, final TOutput result) {
-    final Tasklet<?, TOutput> tasklet = runningTasklets.remove(taskletId);
-    assert tasklet != null; // Tasklet should complete/error only once
-    tasklet.completed(result);
-    return tasklet;
-  }
-
-  Tasklet taskletThrewException(final Integer taskletId, final Exception exception) {
-    final Tasklet tasklet = runningTasklets.remove(taskletId);
-    assert tasklet != null; // Tasklet should complete/error only once
-    tasklet.threwException(exception);
-    return tasklet;
-  }
+  List<Tasklet> taskletsDone(final List<Integer> taskletIds) {
+    final List<Tasklet> taskletList = new ArrayList<>();
+    for (final int taskletId : taskletIds) {
+      taskletList.add(runningTasklets.remove(taskletId));
+    }
 
-  Tasklet taskletCancelled(final Integer taskletId) {
-    final Tasklet tasklet = runningTasklets.remove(taskletId);
-    assert tasklet != null; // Tasklet should finish only once.
-    tasklet.cancelled();
-    return tasklet;
+    return Collections.unmodifiableList(taskletList);
   }
 
   Collection<Tasklet> removed() {

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 26d4287..920f1a9 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -36,7 +36,6 @@ import org.apache.reef.wake.EventHandler;
 import javax.inject.Inject;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.logging.Level;
@@ -111,8 +110,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
                         // Command Executor: Execute the command
                         final Serializable result = taskletExecutionRequest.execute();
                         final TaskletReport taskletReport =
-                            new TaskletResultReport<>(Collections.singletonList(
-                                taskletExecutionRequest.getTaskletId()), result);
+                            new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
                         taskletReports.add(taskletReport);
                       } catch (final InterruptedException ex) {
                         // Assumes that user's thread follows convention that cancelled Futures
@@ -124,8 +122,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
                       } catch (Exception e) {
                         // Command Executor: Tasklet throws an exception
                         final TaskletReport taskletReport =
-                            new TaskletFailureReport(Collections.singletonList(
-                                taskletExecutionRequest.getTaskletId()), e);
+                            new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
                         taskletReports.add(taskletReport);
                       }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index 6c2162a..325d3d4 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -22,6 +22,10 @@ import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.TaskletFailureReport;
+import org.apache.reef.vortex.common.TaskletReport;
+import org.apache.reef.vortex.common.TaskletResultReport;
+import org.apache.reef.vortex.common.WorkerReport;
 import org.junit.Test;
 
 import java.util.*;
@@ -71,7 +75,9 @@ public class DefaultVortexMasterTest {
 
     final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1);
     for (final int taskletId : taskletIds) {
-      vortexMaster.taskletCompleted(vortexWorkerManager1.getId(), taskletId, null);
+      final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null);
+      vortexMaster.workerReported(
+          vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
     }
 
     assertTrue("The VortexFuture should be done", future.isDone());
@@ -106,9 +112,12 @@ public class DefaultVortexMasterTest {
     final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, pendingTasklets, 1);
     assertEquals("Both lists need to contain the same single tasklet id", taskletIds1, taskletIds2);
 
+
     // Completed?
     for (final int taskletId : taskletIds2) {
-      vortexMaster.taskletCompleted(vortexWorkerManager2.getId(), taskletId, null);
+      final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null);
+      vortexMaster.workerReported(
+          vortexWorkerManager2.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
     }
     assertTrue("The VortexFuture should be done", future.isDone());
   }
@@ -157,7 +166,9 @@ public class DefaultVortexMasterTest {
     for (final int taskletId : taskletIds2) {
       final String workerId = runningWorkers.getWhereTaskletWasScheduledTo(taskletId);
       assertNotNull("The tasklet must have been scheduled", workerId);
-      vortexMaster.taskletCompleted(workerId, taskletId, null);
+      final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null);
+      vortexMaster.workerReported(
+          workerId, new WorkerReport(Collections.singletonList(taskletReport)));
     }
     for (final VortexFuture vortexFuture : vortexFutures) {
       assertTrue("The VortexFuture should be done", vortexFuture.isDone());
@@ -196,8 +207,11 @@ public class DefaultVortexMasterTest {
     final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler));
 
     final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1);
+
     for (final int taskletId : taskletIds) {
-      vortexMaster.taskletErrored(vortexWorkerManager1.getId(), taskletId, new RuntimeException("Test exception"));
+      final TaskletReport taskletReport = new TaskletFailureReport(taskletId, new RuntimeException("Test exception."));
+      vortexMaster.workerReported(
+          vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
     }
 
     assertTrue("The VortexFuture should be done", future.isDone());
@@ -246,9 +260,9 @@ public class DefaultVortexMasterTest {
   private VortexFuture createTaskletCancellationFuture(final RunningWorkers runningWorkers,
                                                        final PendingTasklets pendingTasklets) {
     final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction();
-    final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
-
     final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+    final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(vortexMaster);
+
 
     // Allocate worker & tasklet and schedule
     vortexMaster.workerAllocated(vortexWorkerManager1);

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
index 7cd0847..76416cb 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
@@ -20,7 +20,9 @@ package org.apache.reef.vortex.driver;
 
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -30,8 +32,9 @@ import static org.junit.Assert.assertTrue;
  * Test Possible Race Conditions.
  */
 public class RunningWorkersTest {
-  private final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy());
   private final TestUtil testUtil = new TestUtil();
+  private final TestUtil.TestSchedulingPolicy schedulingPolicy = testUtil.newSchedulingPolicy();
+  private final RunningWorkers runningWorkers = new RunningWorkers(schedulingPolicy);
 
   /**
    * Test executor preemption -> executor allocation.
@@ -58,7 +61,12 @@ public class RunningWorkersTest {
     final Collection<Tasklet> tasklets = runningWorkers.removeWorker(vortexWorkerManager.getId()).get();
     assertEquals("Only 1 Tasklet must have been running", 1, tasklets.size());
     assertTrue("This Tasklet must have been running", tasklets.contains(tasklet));
-    runningWorkers.completeTasklet(vortexWorkerManager.getId(), tasklet.getId(), null);
-    assertFalse("Tasklet must not have been completed", tasklet.isCompleted());
+    final List<Integer> taskletIds = new ArrayList<>();
+    for (final Tasklet taskletIter : tasklets) {
+      taskletIds.add(taskletIter.getId());
+    }
+
+    runningWorkers.doneTasklets(vortexWorkerManager.getId(), taskletIds);
+    assertFalse("Tasklet must not have been completed", schedulingPolicy.taskletIsDone(tasklet.getId()));
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index 8704b0b..ab93a08 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -19,14 +19,18 @@
 package org.apache.reef.vortex.driver;
 
 import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
-import org.apache.reef.vortex.common.TaskletCancellationRequest;
-import org.apache.reef.vortex.common.VortexRequest;
+import org.apache.reef.vortex.common.*;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -46,9 +50,16 @@ public final class TestUtil {
   private final VortexMaster vortexMaster = mock(VortexMaster.class);
 
   /**
-   * @return a new mocked worker.
+   * @return a new mocked worker, with a mocked {@link VortexMaster}.
    */
   public VortexWorkerManager newWorker() {
+    return newWorker(vortexMaster);
+  }
+
+  /**
+   * @return a new mocked worker, with the {@link VortexMaster} passed in.
+   */
+  public VortexWorkerManager newWorker(final VortexMaster master) {
     final RunningTask reefTask = mock(RunningTask.class);
     when(reefTask.getId()).thenReturn("worker" + String.valueOf(workerId.getAndIncrement()));
     final VortexRequestor vortexRequestor = mock(VortexRequestor.class);
@@ -58,7 +69,8 @@ public final class TestUtil {
       public Object answer(final InvocationOnMock invocation) throws Throwable {
         final VortexRequest request = (VortexRequest)invocation.getArguments()[1];
         if (request instanceof TaskletCancellationRequest) {
-          workerManager.taskletCancelled(request.getTaskletId());
+          final TaskletReport cancelReport = new TaskletCancelledReport(request.getTaskletId());
+          master.workerReported(workerManager.getId(), new WorkerReport(Collections.singleton(cancelReport)));
         }
 
         return null;
@@ -89,6 +101,13 @@ public final class TestUtil {
   }
 
   /**
+   * @return a queryable {@link org.apache.reef.vortex.driver.TestUtil.TestSchedulingPolicy}
+   */
+  public TestSchedulingPolicy newSchedulingPolicy() {
+    return new TestSchedulingPolicy();
+  }
+
+  /**
    * @return a new dummy function.
    */
   public VortexFunction newInfiniteLoopFunction() {
@@ -116,4 +135,47 @@ public final class TestUtil {
       }
     };
   }
+
+  static final class TestSchedulingPolicy implements SchedulingPolicy  {
+    private final SchedulingPolicy policy = new RandomSchedulingPolicy();
+    private final Set<Integer> doneTasklets = new HashSet<>();
+
+    private TestSchedulingPolicy() {
+    }
+
+    @Override
+    public Optional<String> trySchedule(final Tasklet tasklet) {
+      return policy.trySchedule(tasklet);
+    }
+
+    @Override
+    public void workerAdded(final VortexWorkerManager vortexWorker) {
+      policy.workerAdded(vortexWorker);
+    }
+
+    @Override
+    public void workerRemoved(final VortexWorkerManager vortexWorker) {
+      policy.workerRemoved(vortexWorker);
+    }
+
+    @Override
+    public void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+      policy.taskletLaunched(vortexWorker, tasklet);
+    }
+
+    @Override
+    public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) {
+      policy.taskletsDone(vortexWorker, tasklets);
+      for (final Tasklet t : tasklets) {
+        doneTasklets.add(t.getId());
+      }
+    }
+
+    /**
+     * @return true if Tasklet with taskletId is done, false otherwise.
+     */
+    public boolean taskletIsDone(final int taskletId) {
+      return doneTasklets.contains(taskletId);
+    }
+  }
 }