You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/12/15 04:18:15 UTC

reef git commit: [REEF-1077] Allow VortexWorker reports to report about more than a single Tasklet

Repository: reef
Updated Branches:
  refs/heads/master ccecdd577 -> bc5e14dcc


[REEF-1077] Allow VortexWorker reports to report about more than a single Tasklet

This addressed the issue by
  * Change the original WorkerReport to TaskletReport.
  * Allow WorkerReports to hold a List of TaskletReports.

JIRA:
  [REEF-1077](https://issues.apache.org/jira/browse/REEF-1077)

Pull Request:
  Closes #729


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bc5e14dc
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bc5e14dc
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bc5e14dc

Branch: refs/heads/master
Commit: bc5e14dcc648b489d61bc09155fd523ecd6d276a
Parents: ccecdd5
Author: Andrew Chung <af...@apache.org>
Authored: Sun Dec 13 16:49:02 2015 -0800
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Tue Dec 15 12:16:29 2015 +0900

----------------------------------------------------------------------
 .../reef-vortex/src/main/avro/WorkerReport.avsc |  23 ++-
 .../vortex/common/TaskletCancelledReport.java   |   6 +-
 .../vortex/common/TaskletFailureReport.java     |   8 +-
 .../reef/vortex/common/TaskletReport.java       |  52 +++++++
 .../reef/vortex/common/TaskletResultReport.java |   8 +-
 .../reef/vortex/common/VortexAvroUtils.java     | 150 +++++++++++--------
 .../apache/reef/vortex/common/WorkerReport.java |  34 +++--
 .../apache/reef/vortex/driver/VortexDriver.java |  19 +--
 .../reef/vortex/evaluator/VortexWorker.java     |  20 ++-
 9 files changed, 211 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
index 11adb12..182f88e 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -46,14 +46,16 @@
   {
     "namespace": "org.apache.reef.vortex.common.avro",
     "type": "record",
-    "name": "AvroWorkerReport",
+    "name": "AvroTaskletReport",
     "fields": [
       {
         "name": "reportType",
-        "type": {
+        "type":
+        {
           "type": "enum",
           "name": "AvroReportType",
-          "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]}
+          "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]
+        }
       },
       {
         "name": "taskletReport",
@@ -61,5 +63,20 @@
         "default": null
       }
     ]
+  },
+  {
+    "namespace": "org.apache.reef.vortex.common.avro",
+    "type": "record",
+    "name": "AvroWorkerReport",
+    "fields": [
+      {
+        "name": "taskletReports",
+        "type":
+        {
+          "type": "array",
+          "items": "AvroTaskletReport"
+        }
+      }
+    ]
   }
 ]

http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
index b51219b..1ec1890 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
@@ -24,7 +24,7 @@ import org.apache.reef.annotations.Unstable;
  * The report of a cancelled Tasklet.
  */
 @Unstable
-public final class TaskletCancelledReport implements WorkerReport {
+public final class TaskletCancelledReport implements TaskletReport {
   private int taskletId;
 
   /**
@@ -35,8 +35,8 @@ public final class TaskletCancelledReport implements WorkerReport {
   }
 
   @Override
-  public WorkerReportType getType() {
-    return WorkerReportType.TaskletCancelled;
+  public TaskletReportType getType() {
+    return TaskletReportType.TaskletCancelled;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
index 96df2e1..cbf6953 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
@@ -24,7 +24,7 @@ import org.apache.reef.annotations.Unstable;
  * Report of a tasklet exception.
  */
 @Unstable
-public final class TaskletFailureReport implements WorkerReport {
+public final class TaskletFailureReport implements TaskletReport {
   private final int taskletId;
   private final Exception exception;
 
@@ -38,11 +38,11 @@ public final class TaskletFailureReport implements WorkerReport {
   }
 
   /**
-   * @return the type of this WorkerReport.
+   * @return the type of this TaskletReport.
    */
   @Override
-  public WorkerReportType getType() {
-    return WorkerReportType.TaskletFailure;
+  public TaskletReportType getType() {
+    return TaskletReportType.TaskletFailure;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
new file mode 100644
index 0000000..196f597
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.Serializable;
+
+/**
+ * The interface for a status report from the {@link org.apache.reef.vortex.evaluator.VortexWorker}.
+ */
+@Unstable
+@Private
+@DriverSide
+public interface TaskletReport extends Serializable {
+  /**
+   * Type of TaskletReport.
+   */
+  enum TaskletReportType {
+    TaskletResult,
+    TaskletCancelled,
+    TaskletFailure
+  }
+
+  /**
+   * @return the type of this TaskletReport.
+   */
+  TaskletReportType getType();
+
+  /**
+   * @return the taskletId of this TaskletReport.
+   */
+  int getTaskletId();
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
index cd3a597..de05185 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
  * Report of a tasklet execution result.
  */
 @Unstable
-public final class TaskletResultReport<TOutput extends Serializable> implements WorkerReport {
+public final class TaskletResultReport<TOutput extends Serializable> implements TaskletReport {
   private final int taskletId;
   private final TOutput result;
 
@@ -40,11 +40,11 @@ public final class TaskletResultReport<TOutput extends Serializable> implements
   }
 
   /**
-   * @return the type of this WorkerReport.
+   * @return the type of this TaskletReport.
    */
   @Override
-  public WorkerReportType getType() {
-    return WorkerReportType.TaskletResult;
+  public TaskletReportType getType() {
+    return TaskletReportType.TaskletResult;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index 4ab4cb5..660f059 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -29,6 +29,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Serialize and deserialize Vortex message to/from byte array.
@@ -86,48 +88,58 @@ public final class VortexAvroUtils {
    * @return Serialized byte array.
    */
   public static byte[] toBytes(final WorkerReport workerReport) {
-    // Convert WorkerReport message to Avro message.
-    final AvroWorkerReport avroWorkerReport;
-    switch (workerReport.getType()) {
-    case TaskletResult:
-      final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
-      // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
-      final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult());
-      avroWorkerReport = AvroWorkerReport.newBuilder()
-          .setReportType(AvroReportType.TaskletResult)
-          .setTaskletReport(
-              AvroTaskletResultReport.newBuilder()
-                  .setTaskletId(taskletResultReport.getTaskletId())
-                  .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
-                  .build())
-          .build();
-      break;
-    case TaskletCancelled:
-      final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) workerReport;
-      avroWorkerReport = AvroWorkerReport.newBuilder()
-          .setReportType(AvroReportType.TaskletCancelled)
-          .setTaskletReport(
-              AvroTaskletCancelledReport.newBuilder()
-                  .setTaskletId(workerReport.getTaskletId())
-                  .build())
-          .build();
-      break;
-    case TaskletFailure:
-      final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
-      final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException());
-      avroWorkerReport = AvroWorkerReport.newBuilder()
-          .setReportType(AvroReportType.TaskletFailure)
-          .setTaskletReport(
-              AvroTaskletFailureReport.newBuilder()
-                  .setTaskletId(taskletFailureReport.getTaskletId())
-                  .setSerializedException(ByteBuffer.wrap(serializedException))
-                  .build())
-          .build();
-      break;
-    default:
-      throw new RuntimeException("Undefined message type");
+    final List<AvroTaskletReport> workerTaskletReports = new ArrayList<>();
+
+    for (final TaskletReport taskletReport : workerReport.getTaskletReports()) {
+      final AvroTaskletReport avroTaskletReport;
+      switch (taskletReport.getType()) {
+      case TaskletResult:
+        final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport;
+        // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+        final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult());
+        avroTaskletReport = AvroTaskletReport.newBuilder()
+            .setReportType(AvroReportType.TaskletResult)
+            .setTaskletReport(
+                AvroTaskletResultReport.newBuilder()
+                    .setTaskletId(taskletResultReport.getTaskletId())
+                    .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
+                    .build())
+            .build();
+        break;
+      case TaskletCancelled:
+        final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
+        avroTaskletReport = AvroTaskletReport.newBuilder()
+            .setReportType(AvroReportType.TaskletCancelled)
+            .setTaskletReport(
+                AvroTaskletCancelledReport.newBuilder()
+                    .setTaskletId(taskletCancelledReport.getTaskletId())
+                    .build())
+            .build();
+        break;
+      case TaskletFailure:
+        final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport;
+        final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException());
+        avroTaskletReport = AvroTaskletReport.newBuilder()
+            .setReportType(AvroReportType.TaskletFailure)
+            .setTaskletReport(
+                AvroTaskletFailureReport.newBuilder()
+                    .setTaskletId(taskletFailureReport.getTaskletId())
+                    .setSerializedException(ByteBuffer.wrap(serializedException))
+                    .build())
+            .build();
+        break;
+      default:
+        throw new RuntimeException("Undefined message type");
+      }
+
+      workerTaskletReports.add(avroTaskletReport);
     }
 
+    // Convert WorkerReport message to Avro message.
+    final AvroWorkerReport avroWorkerReport = AvroWorkerReport.newBuilder()
+        .setTaskletReports(workerTaskletReports)
+        .build();
+
     // Serialize the Avro message to byte array.
     return toBytes(avroWorkerReport, AvroWorkerReport.class);
   }
@@ -172,33 +184,41 @@ public final class VortexAvroUtils {
    * @return De-serialized WorkerReport.
    */
   public static WorkerReport toWorkerReport(final byte[] bytes) {
-    final WorkerReport workerReport;
     final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class);
-    switch (avroWorkerReport.getReportType()) {
-    case TaskletResult:
-      final AvroTaskletResultReport taskletResultReport =
-          (AvroTaskletResultReport)avroWorkerReport.getTaskletReport();
-      // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
-      final Serializable output =
-          (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
-      workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
-      break;
-    case TaskletCancelled:
-      final AvroTaskletCancelledReport taskletCancelledReport =
-          (AvroTaskletCancelledReport)avroWorkerReport.getTaskletReport();
-      workerReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId());
-      break;
-    case TaskletFailure:
-      final AvroTaskletFailureReport taskletFailureReport =
-          (AvroTaskletFailureReport)avroWorkerReport.getTaskletReport();
-      final Exception exception =
-          (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
-      workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
-      break;
-    default:
-      throw new RuntimeException("Undefined WorkerReport type");
+    final List<TaskletReport> workerTaskletReports = new ArrayList<>();
+
+    for (final AvroTaskletReport avroTaskletReport : avroWorkerReport.getTaskletReports()) {
+      final TaskletReport taskletReport;
+
+      switch (avroTaskletReport.getReportType()) {
+      case TaskletResult:
+        final AvroTaskletResultReport taskletResultReport =
+            (AvroTaskletResultReport)avroTaskletReport.getTaskletReport();
+        // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+        final Serializable output =
+            (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
+        taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
+        break;
+      case TaskletCancelled:
+        final AvroTaskletCancelledReport taskletCancelledReport =
+            (AvroTaskletCancelledReport)avroTaskletReport.getTaskletReport();
+        taskletReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId());
+        break;
+      case TaskletFailure:
+        final AvroTaskletFailureReport taskletFailureReport =
+            (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport();
+        final Exception exception =
+            (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
+        taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
+        break;
+      default:
+        throw new RuntimeException("Undefined TaskletReport type");
+      }
+
+      workerTaskletReports.add(taskletReport);
     }
-    return workerReport;
+
+    return new WorkerReport(workerTaskletReports);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
index 192299a..7c88a44 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
@@ -19,30 +19,34 @@
 package org.apache.reef.vortex.common;
 
 import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Worker-to-Master protocol.
+ * A report of Tasklet statuses sent form the {@link org.apache.reef.vortex.evaluator.VortexWorker}
+ * to the {@link org.apache.reef.vortex.driver.VortexMaster}.
  */
+@Private
 @Unstable
-public interface WorkerReport extends Serializable {
-  /**
-   * Type of WorkerReport.
-   */
-  enum WorkerReportType {
-    TaskletResult,
-    TaskletCancelled,
-    TaskletFailure
-  }
+@DriverSide
+public final class WorkerReport implements Serializable {
+  private ArrayList<TaskletReport> taskletReports;
 
-  /**
-   * @return the type of this WorkerReport.
-   */
-  WorkerReportType getType();
+  public WorkerReport(final Collection<TaskletReport> taskletReports) {
+    this.taskletReports = new ArrayList<>(taskletReports);
+  }
 
   /**
-   * @return the taskletId of this WorkerReport.
+   * @return the list of Tasklet reports.
    */
-  int getTaskletId();
+  public List<TaskletReport> getTaskletReports() {
+    return Collections.unmodifiableList(taskletReports);
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index 75674cc..e3c24d2 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -28,11 +28,7 @@ import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.vortex.api.VortexStart;
-import org.apache.reef.vortex.common.TaskletCancelledReport;
-import org.apache.reef.vortex.common.TaskletFailureReport;
-import org.apache.reef.vortex.common.TaskletResultReport;
-import org.apache.reef.vortex.common.VortexAvroUtils;
-import org.apache.reef.vortex.common.WorkerReport;
+import org.apache.reef.vortex.common.*;
 import org.apache.reef.vortex.evaluator.VortexWorker;
 import org.apache.reef.wake.EStage;
 import org.apache.reef.wake.EventHandler;
@@ -156,17 +152,22 @@ final class VortexDriver {
     public void onNext(final TaskMessage taskMessage) {
       final String workerId = taskMessage.getId();
       final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get());
-      switch (workerReport.getType()) {
+
+      // TODO[JIRA REEF-942]: Fix when aggregation is allowed.
+      assert workerReport.getTaskletReports().size() == 1;
+
+      final TaskletReport taskletReport = workerReport.getTaskletReports().get(0);
+      switch (taskletReport.getType()) {
       case TaskletResult:
-        final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
+        final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport;
         vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult());
         break;
       case TaskletCancelled:
-        final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport)workerReport;
+        final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
         vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId());
         break;
       case TaskletFailure:
-        final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
+        final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport;
         vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(),
             taskletFailureReport.getException());
         break;

http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 10d23a8..920f1a9 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -35,6 +35,8 @@ import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -101,25 +103,31 @@ public final class VortexWorker implements Task, TaskMessageSource {
                     @Override
                     public void run() {
                       final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+                      final WorkerReport workerReport;
+                      final List<TaskletReport> taskletReports = new ArrayList<>();
+
                       try {
                         // Command Executor: Execute the command
                         final Serializable result = taskletExecutionRequest.execute();
-                        final WorkerReport report =
+                        final TaskletReport taskletReport =
                             new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
-                        workerReports.addLast(VortexAvroUtils.toBytes(report));
+                        taskletReports.add(taskletReport);
                       } catch (final InterruptedException ex) {
                         // Assumes that user's thread follows convention that cancelled Futures
                         // should throw InterruptedException.
-                        final WorkerReport report = new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
+                        final TaskletReport taskletReport =
+                            new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
                         LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", vortexRequest.getTaskletId());
-                        workerReports.addLast(VortexAvroUtils.toBytes(report));
+                        taskletReports.add(taskletReport);
                       } catch (Exception e) {
                         // Command Executor: Tasklet throws an exception
-                        final WorkerReport report =
+                        final TaskletReport taskletReport =
                             new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
-                        workerReports.addLast(VortexAvroUtils.toBytes(report));
+                        taskletReports.add(taskletReport);
                       }
 
+                      workerReport = new WorkerReport(taskletReports);
+                      workerReports.addLast(VortexAvroUtils.toBytes(workerReport));
                       try {
                         latch.await();
                       } catch (final InterruptedException e) {