You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by bg...@apache.org on 2015/12/15 04:18:15 UTC
reef git commit: [REEF-1077] Allow VortexWorker reports to report
about more than a single Tasklet
Repository: reef
Updated Branches:
refs/heads/master ccecdd577 -> bc5e14dcc
[REEF-1077] Allow VortexWorker reports to report about more than a single Tasklet
This addressed the issue by
* Change the original WorkerReport to TaskletReport.
* Allow WorkerReports to hold a List of TaskletReports.
JIRA:
[REEF-1077](https://issues.apache.org/jira/browse/REEF-1077)
Pull Request:
Closes #729
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bc5e14dc
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bc5e14dc
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bc5e14dc
Branch: refs/heads/master
Commit: bc5e14dcc648b489d61bc09155fd523ecd6d276a
Parents: ccecdd5
Author: Andrew Chung <af...@apache.org>
Authored: Sun Dec 13 16:49:02 2015 -0800
Committer: Byung-Gon Chun <bg...@apache.org>
Committed: Tue Dec 15 12:16:29 2015 +0900
----------------------------------------------------------------------
.../reef-vortex/src/main/avro/WorkerReport.avsc | 23 ++-
.../vortex/common/TaskletCancelledReport.java | 6 +-
.../vortex/common/TaskletFailureReport.java | 8 +-
.../reef/vortex/common/TaskletReport.java | 52 +++++++
.../reef/vortex/common/TaskletResultReport.java | 8 +-
.../reef/vortex/common/VortexAvroUtils.java | 150 +++++++++++--------
.../apache/reef/vortex/common/WorkerReport.java | 34 +++--
.../apache/reef/vortex/driver/VortexDriver.java | 19 +--
.../reef/vortex/evaluator/VortexWorker.java | 20 ++-
9 files changed, 211 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
index 11adb12..182f88e 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -46,14 +46,16 @@
{
"namespace": "org.apache.reef.vortex.common.avro",
"type": "record",
- "name": "AvroWorkerReport",
+ "name": "AvroTaskletReport",
"fields": [
{
"name": "reportType",
- "type": {
+ "type":
+ {
"type": "enum",
"name": "AvroReportType",
- "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]}
+ "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]
+ }
},
{
"name": "taskletReport",
@@ -61,5 +63,20 @@
"default": null
}
]
+ },
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
+ "name": "AvroWorkerReport",
+ "fields": [
+ {
+ "name": "taskletReports",
+ "type":
+ {
+ "type": "array",
+ "items": "AvroTaskletReport"
+ }
+ }
+ ]
}
]
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
index b51219b..1ec1890 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
@@ -24,7 +24,7 @@ import org.apache.reef.annotations.Unstable;
* The report of a cancelled Tasklet.
*/
@Unstable
-public final class TaskletCancelledReport implements WorkerReport {
+public final class TaskletCancelledReport implements TaskletReport {
private int taskletId;
/**
@@ -35,8 +35,8 @@ public final class TaskletCancelledReport implements WorkerReport {
}
@Override
- public WorkerReportType getType() {
- return WorkerReportType.TaskletCancelled;
+ public TaskletReportType getType() {
+ return TaskletReportType.TaskletCancelled;
}
@Override
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
index 96df2e1..cbf6953 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
@@ -24,7 +24,7 @@ import org.apache.reef.annotations.Unstable;
* Report of a tasklet exception.
*/
@Unstable
-public final class TaskletFailureReport implements WorkerReport {
+public final class TaskletFailureReport implements TaskletReport {
private final int taskletId;
private final Exception exception;
@@ -38,11 +38,11 @@ public final class TaskletFailureReport implements WorkerReport {
}
/**
- * @return the type of this WorkerReport.
+ * @return the type of this TaskletReport.
*/
@Override
- public WorkerReportType getType() {
- return WorkerReportType.TaskletFailure;
+ public TaskletReportType getType() {
+ return TaskletReportType.TaskletFailure;
}
/**
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
new file mode 100644
index 0000000..196f597
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.Serializable;
+
+/**
+ * The interface for a status report from the {@link org.apache.reef.vortex.evaluator.VortexWorker}.
+ */
+@Unstable
+@Private
+@DriverSide
+public interface TaskletReport extends Serializable {
+ /**
+ * Type of TaskletReport.
+ */
+ enum TaskletReportType {
+ TaskletResult,
+ TaskletCancelled,
+ TaskletFailure
+ }
+
+ /**
+ * @return the type of this TaskletReport.
+ */
+ TaskletReportType getType();
+
+ /**
+ * @return the taskletId of this TaskletReport.
+ */
+ int getTaskletId();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
index cd3a597..de05185 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
* Report of a tasklet execution result.
*/
@Unstable
-public final class TaskletResultReport<TOutput extends Serializable> implements WorkerReport {
+public final class TaskletResultReport<TOutput extends Serializable> implements TaskletReport {
private final int taskletId;
private final TOutput result;
@@ -40,11 +40,11 @@ public final class TaskletResultReport<TOutput extends Serializable> implements
}
/**
- * @return the type of this WorkerReport.
+ * @return the type of this TaskletReport.
*/
@Override
- public WorkerReportType getType() {
- return WorkerReportType.TaskletResult;
+ public TaskletReportType getType() {
+ return TaskletReportType.TaskletResult;
}
/**
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index 4ab4cb5..660f059 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -29,6 +29,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
/**
* Serialize and deserialize Vortex message to/from byte array.
@@ -86,48 +88,58 @@ public final class VortexAvroUtils {
* @return Serialized byte array.
*/
public static byte[] toBytes(final WorkerReport workerReport) {
- // Convert WorkerReport message to Avro message.
- final AvroWorkerReport avroWorkerReport;
- switch (workerReport.getType()) {
- case TaskletResult:
- final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
- // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
- final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult());
- avroWorkerReport = AvroWorkerReport.newBuilder()
- .setReportType(AvroReportType.TaskletResult)
- .setTaskletReport(
- AvroTaskletResultReport.newBuilder()
- .setTaskletId(taskletResultReport.getTaskletId())
- .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
- .build())
- .build();
- break;
- case TaskletCancelled:
- final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) workerReport;
- avroWorkerReport = AvroWorkerReport.newBuilder()
- .setReportType(AvroReportType.TaskletCancelled)
- .setTaskletReport(
- AvroTaskletCancelledReport.newBuilder()
- .setTaskletId(workerReport.getTaskletId())
- .build())
- .build();
- break;
- case TaskletFailure:
- final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
- final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException());
- avroWorkerReport = AvroWorkerReport.newBuilder()
- .setReportType(AvroReportType.TaskletFailure)
- .setTaskletReport(
- AvroTaskletFailureReport.newBuilder()
- .setTaskletId(taskletFailureReport.getTaskletId())
- .setSerializedException(ByteBuffer.wrap(serializedException))
- .build())
- .build();
- break;
- default:
- throw new RuntimeException("Undefined message type");
+ final List<AvroTaskletReport> workerTaskletReports = new ArrayList<>();
+
+ for (final TaskletReport taskletReport : workerReport.getTaskletReports()) {
+ final AvroTaskletReport avroTaskletReport;
+ switch (taskletReport.getType()) {
+ case TaskletResult:
+ final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport;
+ // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+ final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult());
+ avroTaskletReport = AvroTaskletReport.newBuilder()
+ .setReportType(AvroReportType.TaskletResult)
+ .setTaskletReport(
+ AvroTaskletResultReport.newBuilder()
+ .setTaskletId(taskletResultReport.getTaskletId())
+ .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
+ .build())
+ .build();
+ break;
+ case TaskletCancelled:
+ final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
+ avroTaskletReport = AvroTaskletReport.newBuilder()
+ .setReportType(AvroReportType.TaskletCancelled)
+ .setTaskletReport(
+ AvroTaskletCancelledReport.newBuilder()
+ .setTaskletId(taskletCancelledReport.getTaskletId())
+ .build())
+ .build();
+ break;
+ case TaskletFailure:
+ final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport;
+ final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException());
+ avroTaskletReport = AvroTaskletReport.newBuilder()
+ .setReportType(AvroReportType.TaskletFailure)
+ .setTaskletReport(
+ AvroTaskletFailureReport.newBuilder()
+ .setTaskletId(taskletFailureReport.getTaskletId())
+ .setSerializedException(ByteBuffer.wrap(serializedException))
+ .build())
+ .build();
+ break;
+ default:
+ throw new RuntimeException("Undefined message type");
+ }
+
+ workerTaskletReports.add(avroTaskletReport);
}
+ // Convert WorkerReport message to Avro message.
+ final AvroWorkerReport avroWorkerReport = AvroWorkerReport.newBuilder()
+ .setTaskletReports(workerTaskletReports)
+ .build();
+
// Serialize the Avro message to byte array.
return toBytes(avroWorkerReport, AvroWorkerReport.class);
}
@@ -172,33 +184,41 @@ public final class VortexAvroUtils {
* @return De-serialized WorkerReport.
*/
public static WorkerReport toWorkerReport(final byte[] bytes) {
- final WorkerReport workerReport;
final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class);
- switch (avroWorkerReport.getReportType()) {
- case TaskletResult:
- final AvroTaskletResultReport taskletResultReport =
- (AvroTaskletResultReport)avroWorkerReport.getTaskletReport();
- // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
- final Serializable output =
- (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
- workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
- break;
- case TaskletCancelled:
- final AvroTaskletCancelledReport taskletCancelledReport =
- (AvroTaskletCancelledReport)avroWorkerReport.getTaskletReport();
- workerReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId());
- break;
- case TaskletFailure:
- final AvroTaskletFailureReport taskletFailureReport =
- (AvroTaskletFailureReport)avroWorkerReport.getTaskletReport();
- final Exception exception =
- (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
- workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
- break;
- default:
- throw new RuntimeException("Undefined WorkerReport type");
+ final List<TaskletReport> workerTaskletReports = new ArrayList<>();
+
+ for (final AvroTaskletReport avroTaskletReport : avroWorkerReport.getTaskletReports()) {
+ final TaskletReport taskletReport;
+
+ switch (avroTaskletReport.getReportType()) {
+ case TaskletResult:
+ final AvroTaskletResultReport taskletResultReport =
+ (AvroTaskletResultReport)avroTaskletReport.getTaskletReport();
+ // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+ final Serializable output =
+ (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
+ taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
+ break;
+ case TaskletCancelled:
+ final AvroTaskletCancelledReport taskletCancelledReport =
+ (AvroTaskletCancelledReport)avroTaskletReport.getTaskletReport();
+ taskletReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId());
+ break;
+ case TaskletFailure:
+ final AvroTaskletFailureReport taskletFailureReport =
+ (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport();
+ final Exception exception =
+ (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
+ taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
+ break;
+ default:
+ throw new RuntimeException("Undefined TaskletReport type");
+ }
+
+ workerTaskletReports.add(taskletReport);
}
- return workerReport;
+
+ return new WorkerReport(workerTaskletReports);
}
/**
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
index 192299a..7c88a44 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
@@ -19,30 +19,34 @@
package org.apache.reef.vortex.common;
import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
/**
* Worker-to-Master protocol.
+ * A report of Tasklet statuses sent form the {@link org.apache.reef.vortex.evaluator.VortexWorker}
+ * to the {@link org.apache.reef.vortex.driver.VortexMaster}.
*/
+@Private
@Unstable
-public interface WorkerReport extends Serializable {
- /**
- * Type of WorkerReport.
- */
- enum WorkerReportType {
- TaskletResult,
- TaskletCancelled,
- TaskletFailure
- }
+@DriverSide
+public final class WorkerReport implements Serializable {
+ private ArrayList<TaskletReport> taskletReports;
- /**
- * @return the type of this WorkerReport.
- */
- WorkerReportType getType();
+ public WorkerReport(final Collection<TaskletReport> taskletReports) {
+ this.taskletReports = new ArrayList<>(taskletReports);
+ }
/**
- * @return the taskletId of this WorkerReport.
+ * @return the list of Tasklet reports.
*/
- int getTaskletId();
+ public List<TaskletReport> getTaskletReports() {
+ return Collections.unmodifiableList(taskletReports);
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index 75674cc..e3c24d2 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -28,11 +28,7 @@ import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.vortex.api.VortexStart;
-import org.apache.reef.vortex.common.TaskletCancelledReport;
-import org.apache.reef.vortex.common.TaskletFailureReport;
-import org.apache.reef.vortex.common.TaskletResultReport;
-import org.apache.reef.vortex.common.VortexAvroUtils;
-import org.apache.reef.vortex.common.WorkerReport;
+import org.apache.reef.vortex.common.*;
import org.apache.reef.vortex.evaluator.VortexWorker;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
@@ -156,17 +152,22 @@ final class VortexDriver {
public void onNext(final TaskMessage taskMessage) {
final String workerId = taskMessage.getId();
final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get());
- switch (workerReport.getType()) {
+
+ // TODO[JIRA REEF-942]: Fix when aggregation is allowed.
+ assert workerReport.getTaskletReports().size() == 1;
+
+ final TaskletReport taskletReport = workerReport.getTaskletReports().get(0);
+ switch (taskletReport.getType()) {
case TaskletResult:
- final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport;
+ final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport;
vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult());
break;
case TaskletCancelled:
- final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport)workerReport;
+ final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId());
break;
case TaskletFailure:
- final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport;
+ final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport;
vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(),
taskletFailureReport.getException());
break;
http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 10d23a8..920f1a9 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -35,6 +35,8 @@ import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -101,25 +103,31 @@ public final class VortexWorker implements Task, TaskMessageSource {
@Override
public void run() {
final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
+ final WorkerReport workerReport;
+ final List<TaskletReport> taskletReports = new ArrayList<>();
+
try {
// Command Executor: Execute the command
final Serializable result = taskletExecutionRequest.execute();
- final WorkerReport report =
+ final TaskletReport taskletReport =
new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
- workerReports.addLast(VortexAvroUtils.toBytes(report));
+ taskletReports.add(taskletReport);
} catch (final InterruptedException ex) {
// Assumes that user's thread follows convention that cancelled Futures
// should throw InterruptedException.
- final WorkerReport report = new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
+ final TaskletReport taskletReport =
+ new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", vortexRequest.getTaskletId());
- workerReports.addLast(VortexAvroUtils.toBytes(report));
+ taskletReports.add(taskletReport);
} catch (Exception e) {
// Command Executor: Tasklet throws an exception
- final WorkerReport report =
+ final TaskletReport taskletReport =
new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
- workerReports.addLast(VortexAvroUtils.toBytes(report));
+ taskletReports.add(taskletReport);
}
+ workerReport = new WorkerReport(taskletReports);
+ workerReports.addLast(VortexAvroUtils.toBytes(workerReport));
try {
latch.await();
} catch (final InterruptedException e) {